phet commented on code in PR #3715:
URL: https://github.com/apache/gobblin/pull/3715#discussion_r1263117784
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java:
##########
@@ -85,14 +87,16 @@ protected interface CheckedFunction<T, R> {
private final String constantsTableName;
private final int epsilon;
private final int linger;
+ String formattedGetInfoStatement;
+ String formattedSelectAfterInsertStatement;
Review Comment:
package protected is generally not needed. could this be `private`?
suggestion on naming: `thisTableGetInfoStatement` (since the table name is
set)
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java:
##########
@@ -275,6 +287,34 @@ else if (leaseValidityStatus == 2) {
}
}
+ protected Optional<GetEventInfoResult> createGetInfoResult(ResultSet
resultSet) {
+ try {
+ // Extract values from result set
+ Timestamp dbEventTimestamp = resultSet.getTimestamp("event_timestamp");
+ Timestamp dbLeaseAcquisitionTimestamp =
resultSet.getTimestamp("lease_acquisition_timestamp");
+ boolean withinEpsilon = resultSet.getBoolean("isWithinEpsilon");
+ int leaseValidityStatus = resultSet.getInt("leaseValidityStatus");
+ int dbLinger = resultSet.getInt("linger");
+ Timestamp dbCurrentTimestamp =
resultSet.getTimestamp("CURRENT_TIMESTAMP");
+ return Optional.of(new GetEventInfoResult(dbEventTimestamp,
dbLeaseAcquisitionTimestamp, withinEpsilon, leaseValidityStatus,
+ dbLinger, dbCurrentTimestamp));
+ } catch (SQLException exception) {
+ log.warn("Failed to retrieve values from GET event info query resultSet.
Exception: ", exception);
+ // Note: this will proceed to CASE 1 of acquiring a lease above
+ return Optional.absent();
Review Comment:
I wouldn't expect it to be a good idea to swallow a `SQLException` here...
e.g. if we have a column name mispelled or a mismatch of type in a
`resultSet.getXYZ` accessor above shouldn't that be a hard failure?
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java:
##########
@@ -155,19 +163,26 @@ public MysqlMultiActiveLeaseArbiter(Config config) throws
IOException {
} catch (SQLException e) {
throw new IOException("Table creation failure for " +
leaseArbiterTableName, e);
}
+ initializeConstantsTable();
+
+ log.info("MysqlMultiActiveLeaseArbiter initialized");
+ }
+
+ // Initialize Constants table if needed and insert row into it if one does
not exist
+ private void initializeConstantsTable() throws IOException {
String createConstantsStatement =
String.format(CREATE_CONSTANTS_TABLE_STATEMENT, this.constantsTableName);
withPreparedStatement(createConstantsStatement, createStatement ->
createStatement.executeUpdate(), true);
- int count = withPreparedStatement(String.format(GET_ROW_COUNT_STATEMENT,
this.constantsTableName), getStatement -> {
+ Optional<Integer> count =
withPreparedStatement(String.format(GET_ROW_COUNT_STATEMENT,
this.constantsTableName), getStatement -> {
ResultSet resultSet = getStatement.executeQuery();
if (resultSet.next()) {
- return resultSet.getInt(1);
+ return Optional.of(resultSet.getInt(1));
}
- return -1;
+ return Optional.absent();
}, true);
// Only insert epsilon and linger values from config if this table does
not contain pre-existing values.
- if (count == 0) {
+ if (count.isPresent() && count.get() == 0) {
Review Comment:
with multiple participants, use of "check, then set" is error-prone. e.g.
two or more rows could be inserted.
the algo requires there to only ever be *exactly one*. as this is
essential, add a primary key field that should only ever have one value (== 1).
every INSERT should hard-code that value. to seamlessly handle both
first-time init, or instead update when the config settings have changed,
utilize `INSERT... ON DUPLICATE KEY UPDATE`. see:
https://stackoverflow.com/a/1361368
*EVERY PARTICIPANT* would then "upsert" when it starts up, where the most
recent value wins, clobbering the former. even so, due to all participants
having uniform config, in general, no actual overwriting happens with most
updates.
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java:
##########
@@ -430,41 +468,31 @@ class GetEventInfoResult {
private int dbLinger;
private Timestamp dbCurrentTimestamp;
- GetEventInfoResult(ResultSet resultSet) {
- try {
- // Extract values from result set
- dbEventTimestamp = resultSet.getTimestamp("event_timestamp");
- dbLeaseAcquisitionTimestamp =
resultSet.getTimestamp("lease_acquisition_timestamp");
- withinEpsilon = resultSet.getBoolean("isWithinEpsilon");
- leaseValidityStatus = resultSet.getInt("leaseValidityStatus");
- dbLinger = resultSet.getInt("linger");
- dbCurrentTimestamp = resultSet.getTimestamp("CURRENT_TIMESTAMP");
- } catch (SQLException exception) {
- log.warn("Failed to retrieve values from GET event info query
resultSet. Exception: ", exception);
- }
+ GetEventInfoResult(Timestamp eventTimestamp, Timestamp
leaseAcquisitionTimestamp, boolean isWithinEpsilon,
+ int validityStatus, int linger, Timestamp currentTimestamp) {
+ // Extract values from result set
+ dbEventTimestamp = eventTimestamp;
+ dbLeaseAcquisitionTimestamp = leaseAcquisitionTimestamp;
+ withinEpsilon = isWithinEpsilon;
+ leaseValidityStatus = validityStatus;
+ dbLinger = linger;
+ dbCurrentTimestamp = currentTimestamp;
}
}
- @Data
- /*
- Class used to extract information from SELECT query used to determine
status of lease acquisition attempt.
+ /**
+ Class used to store information from SELECT query used to determine status
of lease acquisition attempt.
Review Comment:
also not believing anything is *stored* (i.e. recorded durably)
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java:
##########
@@ -214,31 +226,31 @@ public LeaseAttemptStatus
tryAcquireLease(DagActionStore.DagAction flowAction, l
}
// Extract values from result set
- Timestamp dbEventTimestamp = getResult.getDbEventTimestamp();
- Timestamp dbLeaseAcquisitionTimestamp =
getResult.getDbLeaseAcquisitionTimestamp();
- boolean isWithinEpsilon = getResult.isWithinEpsilon();
- int leaseValidityStatus = getResult.getLeaseValidityStatus();
- int dbLinger = getResult.getDbLinger();
- Timestamp dbCurrentTimestamp = getResult.getDbCurrentTimestamp();
+ Timestamp dbEventTimestamp = getResult.get().getDbEventTimestamp();
+ Timestamp dbLeaseAcquisitionTimestamp =
getResult.get().getDbLeaseAcquisitionTimestamp();
+ boolean isWithinEpsilon = getResult.get().isWithinEpsilon();
+ int leaseValidityStatus = getResult.get().getLeaseValidityStatus();
+ int dbLinger = getResult.get().getDbLinger();
+ Timestamp dbCurrentTimestamp = getResult.get().getDbCurrentTimestamp();
log.info("Multi-active arbiter replacing local trigger event timestamp
with database one {}: "
+ "[{}, triggerEventTimestamp: {}]", dbCurrentTimestamp,
flowAction, eventTimeMillis);
// Lease is valid
if (leaseValidityStatus == 1) {
- // CASE 2: Same event, lease is valid
if (isWithinEpsilon) {
+ log.debug("CASE 2: Same event, lease is valid");
Review Comment:
we probably need to set log context, or else we'll find a bunch of such msgs
w/ no way to correlate which flow action each relates to
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java:
##########
@@ -275,6 +287,34 @@ else if (leaseValidityStatus == 2) {
}
}
+ protected Optional<GetEventInfoResult> createGetInfoResult(ResultSet
resultSet) {
+ try {
+ // Extract values from result set
+ Timestamp dbEventTimestamp = resultSet.getTimestamp("event_timestamp");
+ Timestamp dbLeaseAcquisitionTimestamp =
resultSet.getTimestamp("lease_acquisition_timestamp");
+ boolean withinEpsilon = resultSet.getBoolean("isWithinEpsilon");
+ int leaseValidityStatus = resultSet.getInt("leaseValidityStatus");
Review Comment:
seems we mix `snake_case` and `camelCase` for the column names
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java:
##########
@@ -417,11 +454,12 @@ protected <T> T withPreparedStatement(String sql,
CheckedFunction<PreparedStatem
}
}
- @Data
- /*
- Class used to extract information from initial SELECT query resultSet to be
used for understanding the state of the
+
+ /**
+ Class used to store information from initial SELECT query resultSet to be
used for understanding the state of the
Review Comment:
truly, I don't find this *storing* anything. previously you said *extract*
which is closer.
this being a DTO, it's closer to say, "Class to *convey* info...".
(personally, I'd write
> DTO for arbiter's current lease state for a `FlowActionEvent`
)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]