phet commented on code in PR #3715:
URL: https://github.com/apache/gobblin/pull/3715#discussion_r1260476610


##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceConfiguration.java:
##########
@@ -32,6 +33,7 @@
 
 
 @ToString
+@Slf4j

Review Comment:
   just wondering... is the logging actually used?  otherwise, how did the 
prior version compile w/o either this annotation or an explicit field?



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java:
##########
@@ -54,6 +55,10 @@
  *            than epsilon and encapsulate executor communication latency 
including retry attempts
  *
  * The `event_timestamp` is the time of the flow_action event request.
+ * --- Note ---
+ * We only use the participant's local event_timestamp internally to identify 
the particular flow_action event, but
+ * after interacting with the database to the database utilize the 
CURRENT_TIMESTAMP of the database to insert or keep

Review Comment:
   looks like repeated "with/to the database"
   
   (and true that it avoids local/db time discrepancies, but the biggest 
concern is of clock drift between participants)



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java:
##########
@@ -82,39 +87,43 @@ protected interface CheckedFunction<T, R> {
   private final int linger;
 
   // TODO: define retention on this table
-  private static final String CREATE_LEASE_ARBITER_TABLE_STATEMENT = "CREATE 
TABLE IF NOT EXISTS %S ("
+  private static final String CREATE_LEASE_ARBITER_TABLE_STATEMENT = "CREATE 
TABLE IF NOT EXISTS %s ("
       + "flow_group varchar(" + ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH + ") 
NOT NULL, flow_name varchar("
       + ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH + ") NOT NULL, " + 
"flow_execution_id varchar("
       + ServiceConfigKeys.MAX_FLOW_EXECUTION_ID_LENGTH + ") NOT NULL, 
flow_action varchar(100) NOT NULL, "
       + "event_timestamp TIMESTAMP, "
-      + "lease_acquisition_timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,"
+      + "lease_acquisition_timestamp TIMESTAMP NULL DEFAULT '1970-01-02 
00:00:00', "

Review Comment:
   this choice of default seems questionable... what advantage does it have 
over just using `null`?



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java:
##########
@@ -137,79 +146,98 @@ public MysqlMultiActiveLeaseArbiter(Config config) throws 
IOException {
     this.linger = ConfigUtils.getInt(config, 
ConfigurationKeys.SCHEDULER_EVENT_LINGER_MILLIS_KEY,
         ConfigurationKeys.DEFAULT_SCHEDULER_EVENT_LINGER_MILLIS);
     this.dataSource = MysqlDataSourceFactory.get(config, 
SharedResourcesBrokerFactory.getImplicitBroker());
+    String createArbiterStatement = String.format(
+        CREATE_LEASE_ARBITER_TABLE_STATEMENT, leaseArbiterTableName);
     try (Connection connection = dataSource.getConnection();
-        PreparedStatement createStatement = 
connection.prepareStatement(String.format(
-            CREATE_LEASE_ARBITER_TABLE_STATEMENT, leaseArbiterTableName))) {
+        PreparedStatement createStatement = 
connection.prepareStatement(createArbiterStatement)) {
       createStatement.executeUpdate();
       connection.commit();
     } catch (SQLException e) {
       throw new IOException("Table creation failure for " + 
leaseArbiterTableName, e);
     }
-    withPreparedStatement(String.format(CREATE_CONSTANTS_TABLE_STATEMENT, 
this.constantsTableName, this.constantsTableName),
-        createStatement -> {
-      int i = 0;
-      createStatement.setInt(++i, epsilon);
-      createStatement.setInt(++i, linger);
-      return createStatement.executeUpdate();}, true);
+    String createConstantsStatement = 
String.format(CREATE_CONSTANTS_TABLE_STATEMENT, this.constantsTableName);

Review Comment:
   could be nice to abstract into a method, named for the high-level idea of 
what's going on over these 20 or so LoC



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java:
##########
@@ -137,79 +146,98 @@ public MysqlMultiActiveLeaseArbiter(Config config) throws 
IOException {
     this.linger = ConfigUtils.getInt(config, 
ConfigurationKeys.SCHEDULER_EVENT_LINGER_MILLIS_KEY,
         ConfigurationKeys.DEFAULT_SCHEDULER_EVENT_LINGER_MILLIS);
     this.dataSource = MysqlDataSourceFactory.get(config, 
SharedResourcesBrokerFactory.getImplicitBroker());
+    String createArbiterStatement = String.format(
+        CREATE_LEASE_ARBITER_TABLE_STATEMENT, leaseArbiterTableName);
     try (Connection connection = dataSource.getConnection();
-        PreparedStatement createStatement = 
connection.prepareStatement(String.format(
-            CREATE_LEASE_ARBITER_TABLE_STATEMENT, leaseArbiterTableName))) {
+        PreparedStatement createStatement = 
connection.prepareStatement(createArbiterStatement)) {
       createStatement.executeUpdate();
       connection.commit();
     } catch (SQLException e) {
       throw new IOException("Table creation failure for " + 
leaseArbiterTableName, e);
     }
-    withPreparedStatement(String.format(CREATE_CONSTANTS_TABLE_STATEMENT, 
this.constantsTableName, this.constantsTableName),
-        createStatement -> {
-      int i = 0;
-      createStatement.setInt(++i, epsilon);
-      createStatement.setInt(++i, linger);
-      return createStatement.executeUpdate();}, true);
+    String createConstantsStatement = 
String.format(CREATE_CONSTANTS_TABLE_STATEMENT, this.constantsTableName);
+    withPreparedStatement(createConstantsStatement, createStatement -> 
createStatement.executeUpdate(), true);
+
+    int count = withPreparedStatement(String.format(GET_ROW_COUNT_STATEMENT, 
this.constantsTableName), getStatement -> {
+      ResultSet resultSet = getStatement.executeQuery();
+      if (resultSet.next()) {
+        return resultSet.getInt(1);
+      }
+      return -1;

Review Comment:
   how about `Optional<Integer>` rather than this sentinel value?



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java:
##########
@@ -219,84 +247,90 @@ else if (leaseValidityStatus == 2) {
         // Use our event to acquire lease, check for previous db 
eventTimestamp and leaseAcquisitionTimestamp
         String formattedAcquireLeaseIfMatchingAllStatement =
             
String.format(CONDITIONALLY_ACQUIRE_LEASE_IF_MATCHING_ALL_COLS_STATEMENT, 
this.leaseArbiterTableName);
-        ResultSet rs = withPreparedStatement(
-            formattedAcquireLeaseIfMatchingAllStatement + "; " + 
formattedSelectAfterInsertStatement,
-            updateStatement -> {
-              completeUpdatePreparedStatement(updateStatement, flowAction, 
eventTimeMillis, true,
+        int numRowsUpdated = 
withPreparedStatement(formattedAcquireLeaseIfMatchingAllStatement,
+            insertStatement -> {
+              completeUpdatePreparedStatement(insertStatement, flowAction, 
true,
                   true, dbEventTimestamp, dbLeaseAcquisitionTimestamp);
-              return updateStatement.executeQuery();
+              return insertStatement.executeUpdate();
             }, true);
-        return handleResultFromAttemptedLeaseObtainment(rs, flowAction, 
eventTimeMillis);
+        return evaluateStatusAfterLeaseAttempt(numRowsUpdated, flowAction);
       } // No longer leasing this event
-        // CASE 6: Same event, no longer leasing event in db: terminate
+        // CASE 5: Same event, no longer leasing event in db: terminate
         if (isWithinEpsilon) {
           return new NoLongerLeasingStatus();
         }
-        // CASE 7: Distinct event, no longer leasing event in db
+        // CASE 6: Distinct event, no longer leasing event in db
         // Use our event to acquire lease, check for previous db 
eventTimestamp and NULL leaseAcquisitionTimestamp
         String formattedAcquireLeaseIfFinishedStatement =
             
String.format(CONDITIONALLY_ACQUIRE_LEASE_IF_FINISHED_LEASING_STATEMENT, 
this.leaseArbiterTableName);
-        ResultSet rs = withPreparedStatement(
-            formattedAcquireLeaseIfFinishedStatement + "; " + 
formattedSelectAfterInsertStatement,
-            updateStatement -> {
-              completeUpdatePreparedStatement(updateStatement, flowAction, 
eventTimeMillis, true,
+        int numRowsUpdated = 
withPreparedStatement(formattedAcquireLeaseIfFinishedStatement,
+            insertStatement -> {
+              completeUpdatePreparedStatement(insertStatement, flowAction, 
true,
                   false, dbEventTimestamp, null);
-              return updateStatement.executeQuery();
+              return insertStatement.executeUpdate();
             }, true);
-        return handleResultFromAttemptedLeaseObtainment(rs, flowAction, 
eventTimeMillis);
+        return evaluateStatusAfterLeaseAttempt(numRowsUpdated, flowAction);
     } catch (SQLException e) {
       throw new RuntimeException(e);
     }
   }
 
   /**
-   * Attempt lease by insert or update following a read based on the condition 
the state of the table has not changed
-   * since the read. Parse the result to return the corresponding status based 
on successful insert/update or not.
-   * @param resultSet
-   * @param eventTimeMillis
-   * @return LeaseAttemptStatus
+   * Parse result of attempted insert/update to obtain a lease for a
+   * {@link org.apache.gobblin.runtime.api.DagActionStore.DagAction} event by 
selecting values corresponding to that
+   * event from the table to return the corresponding status based on 
successful insert/update or not.
    * @throws SQLException
    * @throws IOException
    */
-  protected LeaseAttemptStatus 
handleResultFromAttemptedLeaseObtainment(ResultSet resultSet,
-      DagActionStore.DagAction flowAction, long eventTimeMillis)
+  protected LeaseAttemptStatus evaluateStatusAfterLeaseAttempt(int 
numRowsUpdated,
+      DagActionStore.DagAction flowAction)
       throws SQLException, IOException {
-    if (!resultSet.next()) {
-      throw new IOException("Expected num rows and lease_acquisition_timestamp 
returned from query but received nothing");
-    }
-    int numRowsUpdated = resultSet.getInt(1);
-    long leaseAcquisitionTimeMillis = resultSet.getTimestamp(2).getTime();
-    int dbLinger = resultSet.getInt(3);
+    // Fetch values in row after attempted insert
+    String formattedSelectAfterInsertStatement =
+        String.format(SELECT_AFTER_INSERT_STATEMENT, 
this.leaseArbiterTableName, this.constantsTableName);

Review Comment:
   wondering: is it truly necessary to bind the table name every time this 
query is made?  e.g. couldn't it be done once when the instance is constructed? 
 or might the table names ever change throughout the instance's lifetime?



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java:
##########
@@ -137,79 +146,98 @@ public MysqlMultiActiveLeaseArbiter(Config config) throws 
IOException {
     this.linger = ConfigUtils.getInt(config, 
ConfigurationKeys.SCHEDULER_EVENT_LINGER_MILLIS_KEY,
         ConfigurationKeys.DEFAULT_SCHEDULER_EVENT_LINGER_MILLIS);
     this.dataSource = MysqlDataSourceFactory.get(config, 
SharedResourcesBrokerFactory.getImplicitBroker());
+    String createArbiterStatement = String.format(
+        CREATE_LEASE_ARBITER_TABLE_STATEMENT, leaseArbiterTableName);
     try (Connection connection = dataSource.getConnection();
-        PreparedStatement createStatement = 
connection.prepareStatement(String.format(
-            CREATE_LEASE_ARBITER_TABLE_STATEMENT, leaseArbiterTableName))) {
+        PreparedStatement createStatement = 
connection.prepareStatement(createArbiterStatement)) {
       createStatement.executeUpdate();
       connection.commit();
     } catch (SQLException e) {
       throw new IOException("Table creation failure for " + 
leaseArbiterTableName, e);
     }
-    withPreparedStatement(String.format(CREATE_CONSTANTS_TABLE_STATEMENT, 
this.constantsTableName, this.constantsTableName),
-        createStatement -> {
-      int i = 0;
-      createStatement.setInt(++i, epsilon);
-      createStatement.setInt(++i, linger);
-      return createStatement.executeUpdate();}, true);
+    String createConstantsStatement = 
String.format(CREATE_CONSTANTS_TABLE_STATEMENT, this.constantsTableName);
+    withPreparedStatement(createConstantsStatement, createStatement -> 
createStatement.executeUpdate(), true);
+
+    int count = withPreparedStatement(String.format(GET_ROW_COUNT_STATEMENT, 
this.constantsTableName), getStatement -> {
+      ResultSet resultSet = getStatement.executeQuery();
+      if (resultSet.next()) {
+        return resultSet.getInt(1);
+      }
+      return -1;
+    }, true);
+
+    // Only insert epsilon and linger values from config if this table does 
not contain pre-existing values.
+    if (count == 0) {
+      String insertConstantsStatement = 
String.format(INSERT_IN_CONSTANTS_TABLE_STATEMENT, this.constantsTableName);
+      withPreparedStatement(insertConstantsStatement, insertStatement -> {
+        int i = 0;
+        insertStatement.setInt(++i, epsilon);
+        insertStatement.setInt(++i, linger);
+        return insertStatement.executeUpdate();
+      }, true);
+    }
+
+    log.info("MysqlMultiActiveLeaseArbiter initialized");
   }
 
   @Override
   public LeaseAttemptStatus tryAcquireLease(DagActionStore.DagAction 
flowAction, long eventTimeMillis)
       throws IOException {
     // Check table for an existing entry for this flow action and event time
-    ResultSet resultSet = withPreparedStatement(
+    GetEventInfoResult getResult = withPreparedStatement(
         String.format(GET_EVENT_INFO_STATEMENT, this.leaseArbiterTableName, 
this.constantsTableName),
         getInfoStatement -> {
           int i = 0;
-          getInfoStatement.setTimestamp(++i, new Timestamp(eventTimeMillis));
           getInfoStatement.setString(++i, flowAction.getFlowGroup());
           getInfoStatement.setString(++i, flowAction.getFlowName());
           getInfoStatement.setString(++i, flowAction.getFlowExecutionId());
           getInfoStatement.setString(++i, 
flowAction.getFlowActionType().toString());
-          return getInfoStatement.executeQuery();
+          ResultSet resultSet = getInfoStatement.executeQuery();
+          if (!resultSet.next()) {
+            return null;
+          }
+          return new GetEventInfoResult(resultSet);
         }, true);
 
-    String formattedSelectAfterInsertStatement =
-        String.format(SELECT_AFTER_INSERT_STATEMENT, 
this.leaseArbiterTableName, this.constantsTableName);
     try {
       // CASE 1: If no existing row for this flow action, then go ahead and 
insert
-      if (!resultSet.next()) {
+      if (getResult == null) {
+        log.debug("CASE 1: no existing row for this flow action, then go ahead 
and insert");

Review Comment:
   may be worth writing only in `log.debug`, not also as a comment just above.  
[update: I see other cases lack logging.  I'd imagine tracing more helpful to 
debugging than a source comment.]
   
   also, I'm forgetting on the numbering... did each case receive a number 
somewhere previously, which you're referencing here (e.g. in a javadoc comment)?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java:
##########
@@ -62,15 +63,15 @@
 public class FlowTriggerHandler {
   private final int schedulerMaxBackoffMillis;
   private static Random random = new Random();
-  protected MultiActiveLeaseArbiter multiActiveLeaseArbiter;
+  protected Optional<MultiActiveLeaseArbiter> multiActiveLeaseArbiter;
   protected SchedulerService schedulerService;
-  protected DagActionStore dagActionStore;
+  protected Optional<DagActionStore> dagActionStore;

Review Comment:
   great!  I strongly prefer this to `null` ;)
   
   even so, since the two must work together, the ultimate abstraction would be 
a Strategy interface w/ capability to persist the flow action and handle the 
trigger event.  in that approach, this ctor would pass these two optionals to a 
factory to get an appropriate instance of the strategy.  when both present, the 
factory would give a strategy using them both, but when they're both missing, a 
"null strategy" would be returned.  the factory would throw if only one were 
given.



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java:
##########
@@ -82,39 +87,43 @@ protected interface CheckedFunction<T, R> {
   private final int linger;
 
   // TODO: define retention on this table
-  private static final String CREATE_LEASE_ARBITER_TABLE_STATEMENT = "CREATE 
TABLE IF NOT EXISTS %S ("
+  private static final String CREATE_LEASE_ARBITER_TABLE_STATEMENT = "CREATE 
TABLE IF NOT EXISTS %s ("
       + "flow_group varchar(" + ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH + ") 
NOT NULL, flow_name varchar("
       + ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH + ") NOT NULL, " + 
"flow_execution_id varchar("
       + ServiceConfigKeys.MAX_FLOW_EXECUTION_ID_LENGTH + ") NOT NULL, 
flow_action varchar(100) NOT NULL, "
       + "event_timestamp TIMESTAMP, "
-      + "lease_acquisition_timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,"
+      + "lease_acquisition_timestamp TIMESTAMP NULL DEFAULT '1970-01-02 
00:00:00', "
       + "PRIMARY KEY (flow_group,flow_name,flow_execution_id,flow_action))";
   private static final String CREATE_CONSTANTS_TABLE_STATEMENT = "CREATE TABLE 
IF NOT EXISTS %s "
-      + "(epsilon INT, linger INT), PRIMARY KEY (epsilon, linger); INSERT INTO 
%s (epsilon, linger) VALUES (?,?)";
+      + "(epsilon INT, linger INT, PRIMARY KEY (epsilon, linger))";
+  private static final String GET_ROW_COUNT_STATEMENT = "SELECT COUNT(*) FROM 
%s";
+  private static final String INSERT_IN_CONSTANTS_TABLE_STATEMENT = "INSERT 
INTO %s (epsilon, linger) VALUES (?,?)";
   protected static final String WHERE_CLAUSE_TO_MATCH_KEY = "WHERE 
flow_group=? AND flow_name=? AND flow_execution_id=?"
       + " AND flow_action=?";
   protected static final String WHERE_CLAUSE_TO_MATCH_ROW = 
WHERE_CLAUSE_TO_MATCH_KEY
       + " AND event_timestamp=? AND lease_acquisition_timestamp=?";
-  protected static final String SELECT_AFTER_INSERT_STATEMENT = "SELECT 
ROW_COUNT() AS rows_inserted_count, "
-      + "lease_acquisition_timestamp, linger FROM %s, %s " + 
WHERE_CLAUSE_TO_MATCH_KEY;
+  protected static final String SELECT_AFTER_INSERT_STATEMENT = "SELECT 
event_timestamp, lease_acquisition_timestamp, "
+    + "linger FROM %s, %s " + WHERE_CLAUSE_TO_MATCH_KEY;
   // Does a cross join between the two tables to have epsilon and linger 
values available. Returns the following values:
-  // event_timestamp, lease_acquisition_timestamp, isWithinEpsilon (boolean if 
event_timestamp in table is within
-  // epsilon), leaseValidityStatus (1 if lease has not expired, 2 if expired, 
3 if column is NULL or no longer leasing)
+  // event_timestamp, lease_acquisition_timestamp, isWithinEpsilon (boolean if 
current time in db is within epsilon of
+  // event_timestamp), leaseValidityStatus (1 if lease has not expired, 2 if 
expired, 3 if column is NULL or no longer
+  // leasing)
   protected static final String GET_EVENT_INFO_STATEMENT = "SELECT 
event_timestamp, lease_acquisition_timestamp, "
-      + "abs(event_timestamp - ?) <= epsilon as isWithinEpsilon, CASE "
-      + "WHEN CURRENT_TIMESTAMP < (lease_acquisition_timestamp + linger) then 
1"
-      + "WHEN CURRENT_TIMESTAMP >= (lease_acquisition_timestamp + linger) then 
2"
-      + "ELSE 3 END as leaseValidityStatus, linger FROM %s, %s " + 
WHERE_CLAUSE_TO_MATCH_KEY;
+      + "TIMESTAMPDIFF(microsecond, event_timestamp, CURRENT_TIMESTAMP) / 1000 
<= epsilon as isWithinEpsilon, CASE "
+      + "WHEN CURRENT_TIMESTAMP < DATE_ADD(lease_acquisition_timestamp, 
INTERVAL linger*1000 MICROSECOND) then 1 "
+      + "WHEN CURRENT_TIMESTAMP >= DATE_ADD(lease_acquisition_timestamp, 
INTERVAL linger*1000 MICROSECOND) then 2 "
+      + "ELSE 3 END as leaseValidityStatus, linger, CURRENT_TIMESTAMP FROM %s, 
%s " + WHERE_CLAUSE_TO_MATCH_KEY;

Review Comment:
   I don't know enough about sql's evaluation rules to be certain whether the 
multiple invocations of `CURRENT_TIMESTAMP` all resolve to a common evaluation 
or would be invoked multiple times (and if so, whether in a defined order or 
not). do you know?
   
   if this leads to multiple evaluations with no ordering guarantee, would all 
logic remain internally consistent and provide an atomic view across multiple 
fields?
   
   if not, tge alternative would be to evaluate `CURRENT_TIMESTAMP` only once 
in an inner (nested) query.



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java:
##########
@@ -384,8 +411,60 @@ protected <T> T withPreparedStatement(String sql, 
CheckedFunction<PreparedStatem
       }
       return result;
     } catch (SQLException e) {
-      log.warn("Received SQL exception that can result from invalid 
connection. Checking if validation query is set {} Exception is {}", 
((HikariDataSource) this.dataSource).getConnectionTestQuery(), e);
+      log.warn("Received SQL exception that can result from invalid 
connection. Checking if validation query is set {} "
+          + "Exception is {}", ((HikariDataSource) 
this.dataSource).getConnectionTestQuery(), e);
       throw new IOException(e);
     }
   }
+
+  @Data
+  /*
+  Class used to extract information from initial SELECT query resultSet to be 
used for understanding the state of the
+  flow action event's lease in the arbiter store and act accordingly.
+  */

Review Comment:
   minor, but
   a. make this class javadoc via `/**`
   b. I usually see annotations following class-level comments, not before the 
comments



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java:
##########
@@ -384,8 +411,60 @@ protected <T> T withPreparedStatement(String sql, 
CheckedFunction<PreparedStatem
       }
       return result;
     } catch (SQLException e) {
-      log.warn("Received SQL exception that can result from invalid 
connection. Checking if validation query is set {} Exception is {}", 
((HikariDataSource) this.dataSource).getConnectionTestQuery(), e);
+      log.warn("Received SQL exception that can result from invalid 
connection. Checking if validation query is set {} "
+          + "Exception is {}", ((HikariDataSource) 
this.dataSource).getConnectionTestQuery(), e);
       throw new IOException(e);
     }
   }
+
+  @Data
+  /*
+  Class used to extract information from initial SELECT query resultSet to be 
used for understanding the state of the
+  flow action event's lease in the arbiter store and act accordingly.
+  */
+  class GetEventInfoResult {
+    private Timestamp dbEventTimestamp;
+    private Timestamp dbLeaseAcquisitionTimestamp;
+    private boolean withinEpsilon;
+    private int leaseValidityStatus;
+    private int dbLinger;
+    private Timestamp dbCurrentTimestamp;
+
+    GetEventInfoResult(ResultSet resultSet) {

Review Comment:
   nit, but rather than defining a variant constructor, this would be better as 
a factory method:
   ```
   public static fromResultSet(ResultSet rs);
   ```



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java:
##########
@@ -137,79 +146,98 @@ public MysqlMultiActiveLeaseArbiter(Config config) throws 
IOException {
     this.linger = ConfigUtils.getInt(config, 
ConfigurationKeys.SCHEDULER_EVENT_LINGER_MILLIS_KEY,
         ConfigurationKeys.DEFAULT_SCHEDULER_EVENT_LINGER_MILLIS);
     this.dataSource = MysqlDataSourceFactory.get(config, 
SharedResourcesBrokerFactory.getImplicitBroker());
+    String createArbiterStatement = String.format(
+        CREATE_LEASE_ARBITER_TABLE_STATEMENT, leaseArbiterTableName);
     try (Connection connection = dataSource.getConnection();
-        PreparedStatement createStatement = 
connection.prepareStatement(String.format(
-            CREATE_LEASE_ARBITER_TABLE_STATEMENT, leaseArbiterTableName))) {
+        PreparedStatement createStatement = 
connection.prepareStatement(createArbiterStatement)) {
       createStatement.executeUpdate();
       connection.commit();
     } catch (SQLException e) {
       throw new IOException("Table creation failure for " + 
leaseArbiterTableName, e);
     }
-    withPreparedStatement(String.format(CREATE_CONSTANTS_TABLE_STATEMENT, 
this.constantsTableName, this.constantsTableName),
-        createStatement -> {
-      int i = 0;
-      createStatement.setInt(++i, epsilon);
-      createStatement.setInt(++i, linger);
-      return createStatement.executeUpdate();}, true);
+    String createConstantsStatement = 
String.format(CREATE_CONSTANTS_TABLE_STATEMENT, this.constantsTableName);
+    withPreparedStatement(createConstantsStatement, createStatement -> 
createStatement.executeUpdate(), true);
+
+    int count = withPreparedStatement(String.format(GET_ROW_COUNT_STATEMENT, 
this.constantsTableName), getStatement -> {
+      ResultSet resultSet = getStatement.executeQuery();
+      if (resultSet.next()) {
+        return resultSet.getInt(1);
+      }
+      return -1;
+    }, true);
+
+    // Only insert epsilon and linger values from config if this table does 
not contain pre-existing values.
+    if (count == 0) {
+      String insertConstantsStatement = 
String.format(INSERT_IN_CONSTANTS_TABLE_STATEMENT, this.constantsTableName);
+      withPreparedStatement(insertConstantsStatement, insertStatement -> {
+        int i = 0;
+        insertStatement.setInt(++i, epsilon);
+        insertStatement.setInt(++i, linger);
+        return insertStatement.executeUpdate();
+      }, true);
+    }
+
+    log.info("MysqlMultiActiveLeaseArbiter initialized");
   }
 
   @Override
   public LeaseAttemptStatus tryAcquireLease(DagActionStore.DagAction 
flowAction, long eventTimeMillis)
       throws IOException {
     // Check table for an existing entry for this flow action and event time
-    ResultSet resultSet = withPreparedStatement(
+    GetEventInfoResult getResult = withPreparedStatement(
         String.format(GET_EVENT_INFO_STATEMENT, this.leaseArbiterTableName, 
this.constantsTableName),
         getInfoStatement -> {
           int i = 0;
-          getInfoStatement.setTimestamp(++i, new Timestamp(eventTimeMillis));
           getInfoStatement.setString(++i, flowAction.getFlowGroup());
           getInfoStatement.setString(++i, flowAction.getFlowName());
           getInfoStatement.setString(++i, flowAction.getFlowExecutionId());
           getInfoStatement.setString(++i, 
flowAction.getFlowActionType().toString());
-          return getInfoStatement.executeQuery();
+          ResultSet resultSet = getInfoStatement.executeQuery();
+          if (!resultSet.next()) {
+            return null;

Review Comment:
   `Optional` is nice for the compile-time reminder to explicitly consider 
`null` (`Optional.absent`)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to