phet commented on code in PR #3715:
URL: https://github.com/apache/gobblin/pull/3715#discussion_r1263117784


##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java:
##########
@@ -85,14 +87,16 @@ protected interface CheckedFunction<T, R> {
   private final String constantsTableName;
   private final int epsilon;
   private final int linger;
+  String formattedGetInfoStatement;
+  String formattedSelectAfterInsertStatement;

Review Comment:
   package protected is generally not needed.  could this be `private`?
   
   suggestion on naming: `thisTableGetInfoStatement` (since the table name is 
set)



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java:
##########
@@ -275,6 +287,34 @@ else if (leaseValidityStatus == 2) {
     }
   }
 
+  protected Optional<GetEventInfoResult> createGetInfoResult(ResultSet 
resultSet) {
+    try {
+      // Extract values from result set
+      Timestamp dbEventTimestamp = resultSet.getTimestamp("event_timestamp");
+      Timestamp dbLeaseAcquisitionTimestamp = 
resultSet.getTimestamp("lease_acquisition_timestamp");
+      boolean withinEpsilon = resultSet.getBoolean("isWithinEpsilon");
+      int leaseValidityStatus = resultSet.getInt("leaseValidityStatus");
+      int dbLinger = resultSet.getInt("linger");
+      Timestamp dbCurrentTimestamp = 
resultSet.getTimestamp("CURRENT_TIMESTAMP");
+      return Optional.of(new GetEventInfoResult(dbEventTimestamp, 
dbLeaseAcquisitionTimestamp, withinEpsilon, leaseValidityStatus,
+          dbLinger, dbCurrentTimestamp));
+    } catch (SQLException exception) {
+      log.warn("Failed to retrieve values from GET event info query resultSet. 
Exception: ", exception);
+      // Note: this will proceed to CASE 1 of acquiring a lease above
+      return Optional.absent();

Review Comment:
   I wouldn't expect it to be a good idea to swallow a `SQLException` here... 
e.g. if we have a column name mispelled or a mismatch of type in a 
`resultSet.getXYZ` accessor above shouldn't that be a hard failure?



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java:
##########
@@ -155,19 +163,26 @@ public MysqlMultiActiveLeaseArbiter(Config config) throws 
IOException {
     } catch (SQLException e) {
       throw new IOException("Table creation failure for " + 
leaseArbiterTableName, e);
     }
+    initializeConstantsTable();
+
+    log.info("MysqlMultiActiveLeaseArbiter initialized");
+  }
+
+  // Initialize Constants table if needed and insert row into it if one does 
not exist
+  private void initializeConstantsTable() throws IOException {
     String createConstantsStatement = 
String.format(CREATE_CONSTANTS_TABLE_STATEMENT, this.constantsTableName);
     withPreparedStatement(createConstantsStatement, createStatement -> 
createStatement.executeUpdate(), true);
 
-    int count = withPreparedStatement(String.format(GET_ROW_COUNT_STATEMENT, 
this.constantsTableName), getStatement -> {
+    Optional<Integer> count = 
withPreparedStatement(String.format(GET_ROW_COUNT_STATEMENT, 
this.constantsTableName), getStatement -> {
       ResultSet resultSet = getStatement.executeQuery();
       if (resultSet.next()) {
-        return resultSet.getInt(1);
+        return Optional.of(resultSet.getInt(1));
       }
-      return -1;
+      return Optional.absent();
     }, true);
 
     // Only insert epsilon and linger values from config if this table does 
not contain pre-existing values.
-    if (count == 0) {
+    if (count.isPresent() && count.get() == 0) {

Review Comment:
   with multiple participants, use of "check, then set" is error-prone.  e.g. 
two or more rows could be inserted.
   
   the algo requires there to only ever be *exactly one*.  as this is 
essential, add a primary key field that should only ever have one value (== 1). 
 every INSERT should hard-code that value.  to seamlessly handle both 
first-time init, or instead update when the config settings have changed, 
utilize `INSERT... ON DUPLICATE KEY UPDATE`.  see: 
https://stackoverflow.com/a/1361368
   
   *EVERY PARTICIPANT* would then "upsert" when it starts up, where the most 
recent value wins, clobbering the former.  even so, due to all participants 
having uniform config, in general, no actual overwriting happens with most 
updates.



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java:
##########
@@ -430,41 +468,31 @@ class GetEventInfoResult {
     private int dbLinger;
     private Timestamp dbCurrentTimestamp;
 
-    GetEventInfoResult(ResultSet resultSet) {
-      try {
-        // Extract values from result set
-        dbEventTimestamp = resultSet.getTimestamp("event_timestamp");
-        dbLeaseAcquisitionTimestamp = 
resultSet.getTimestamp("lease_acquisition_timestamp");
-        withinEpsilon = resultSet.getBoolean("isWithinEpsilon");
-        leaseValidityStatus = resultSet.getInt("leaseValidityStatus");
-        dbLinger = resultSet.getInt("linger");
-        dbCurrentTimestamp = resultSet.getTimestamp("CURRENT_TIMESTAMP");
-      } catch (SQLException exception) {
-        log.warn("Failed to retrieve values from GET event info query 
resultSet. Exception: ", exception);
-      }
+    GetEventInfoResult(Timestamp eventTimestamp, Timestamp 
leaseAcquisitionTimestamp, boolean isWithinEpsilon,
+        int validityStatus, int linger, Timestamp currentTimestamp) {
+      // Extract values from result set
+      dbEventTimestamp = eventTimestamp;
+      dbLeaseAcquisitionTimestamp = leaseAcquisitionTimestamp;
+      withinEpsilon = isWithinEpsilon;
+      leaseValidityStatus = validityStatus;
+      dbLinger = linger;
+      dbCurrentTimestamp = currentTimestamp;
     }
   }
 
-  @Data
-  /*
-   Class used to extract information from SELECT query used to determine 
status of lease acquisition attempt.
+  /**
+   Class used to store information from SELECT query used to determine status 
of lease acquisition attempt.

Review Comment:
   also not believing anything is *stored* (i.e. recorded durably)



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java:
##########
@@ -214,31 +226,31 @@ public LeaseAttemptStatus 
tryAcquireLease(DagActionStore.DagAction flowAction, l
       }
 
       // Extract values from result set
-      Timestamp dbEventTimestamp = getResult.getDbEventTimestamp();
-      Timestamp dbLeaseAcquisitionTimestamp = 
getResult.getDbLeaseAcquisitionTimestamp();
-      boolean isWithinEpsilon = getResult.isWithinEpsilon();
-      int leaseValidityStatus = getResult.getLeaseValidityStatus();
-      int dbLinger = getResult.getDbLinger();
-      Timestamp dbCurrentTimestamp = getResult.getDbCurrentTimestamp();
+      Timestamp dbEventTimestamp = getResult.get().getDbEventTimestamp();
+      Timestamp dbLeaseAcquisitionTimestamp = 
getResult.get().getDbLeaseAcquisitionTimestamp();
+      boolean isWithinEpsilon = getResult.get().isWithinEpsilon();
+      int leaseValidityStatus = getResult.get().getLeaseValidityStatus();
+      int dbLinger = getResult.get().getDbLinger();
+      Timestamp dbCurrentTimestamp = getResult.get().getDbCurrentTimestamp();
 
       log.info("Multi-active arbiter replacing local trigger event timestamp 
with database one {}: "
           + "[{}, triggerEventTimestamp: {}]",  dbCurrentTimestamp, 
flowAction, eventTimeMillis);
 
       // Lease is valid
       if (leaseValidityStatus == 1) {
-        // CASE 2: Same event, lease is valid
         if (isWithinEpsilon) {
+          log.debug("CASE 2: Same event, lease is valid");

Review Comment:
   we probably need to set log context, or else we'll find a bunch of such msgs 
w/ no way to correlate which flow action each relates to



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java:
##########
@@ -275,6 +287,34 @@ else if (leaseValidityStatus == 2) {
     }
   }
 
+  protected Optional<GetEventInfoResult> createGetInfoResult(ResultSet 
resultSet) {
+    try {
+      // Extract values from result set
+      Timestamp dbEventTimestamp = resultSet.getTimestamp("event_timestamp");
+      Timestamp dbLeaseAcquisitionTimestamp = 
resultSet.getTimestamp("lease_acquisition_timestamp");
+      boolean withinEpsilon = resultSet.getBoolean("isWithinEpsilon");
+      int leaseValidityStatus = resultSet.getInt("leaseValidityStatus");

Review Comment:
   seems we mix `snake_case` and `camelCase` for the column names



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java:
##########
@@ -417,11 +454,12 @@ protected <T> T withPreparedStatement(String sql, 
CheckedFunction<PreparedStatem
     }
   }
 
-  @Data
-  /*
-  Class used to extract information from initial SELECT query resultSet to be 
used for understanding the state of the
+
+  /**
+  Class used to store information from initial SELECT query resultSet to be 
used for understanding the state of the

Review Comment:
   truly, I don't find this *storing* anything.  previously you said *extract* 
which is closer.
   
   this being a DTO, it's closer to say, "Class to *convey* info...".
   
   (personally, I'd write
   > DTO for arbiter's current lease state for a `FlowActionEvent`
   
   )



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to