umustafi commented on code in PR #3715:
URL: https://github.com/apache/gobblin/pull/3715#discussion_r1267236668
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java:
##########
@@ -219,84 +258,142 @@ else if (leaseValidityStatus == 2) {
// Use our event to acquire lease, check for previous db
eventTimestamp and leaseAcquisitionTimestamp
String formattedAcquireLeaseIfMatchingAllStatement =
String.format(CONDITIONALLY_ACQUIRE_LEASE_IF_MATCHING_ALL_COLS_STATEMENT,
this.leaseArbiterTableName);
- ResultSet rs = withPreparedStatement(
- formattedAcquireLeaseIfMatchingAllStatement + "; " +
formattedSelectAfterInsertStatement,
- updateStatement -> {
- completeUpdatePreparedStatement(updateStatement, flowAction,
eventTimeMillis, true,
+ int numRowsUpdated =
withPreparedStatement(formattedAcquireLeaseIfMatchingAllStatement,
+ insertStatement -> {
+ completeUpdatePreparedStatement(insertStatement, flowAction,
true,
true, dbEventTimestamp, dbLeaseAcquisitionTimestamp);
- return updateStatement.executeQuery();
+ return insertStatement.executeUpdate();
}, true);
- return handleResultFromAttemptedLeaseObtainment(rs, flowAction,
eventTimeMillis);
+ return evaluateStatusAfterLeaseAttempt(numRowsUpdated, flowAction,
Optional.of(dbCurrentTimestamp));
} // No longer leasing this event
- // CASE 6: Same event, no longer leasing event in db: terminate
if (isWithinEpsilon) {
+ log.debug("tryAcquireLease for [{}, eventTimestamp: {}] - CASE 5:
Same event, no longer leasing event in db: "
+ + "terminate", flowAction, dbCurrentTimestamp.getTime());
return new NoLongerLeasingStatus();
}
- // CASE 7: Distinct event, no longer leasing event in db
+ log.debug("tryAcquireLease for [{}, eventTimestamp: {}] - CASE 6:
Distinct event, no longer leasing event in "
+ + "db", flowAction, dbCurrentTimestamp.getTime());
// Use our event to acquire lease, check for previous db
eventTimestamp and NULL leaseAcquisitionTimestamp
String formattedAcquireLeaseIfFinishedStatement =
String.format(CONDITIONALLY_ACQUIRE_LEASE_IF_FINISHED_LEASING_STATEMENT,
this.leaseArbiterTableName);
- ResultSet rs = withPreparedStatement(
- formattedAcquireLeaseIfFinishedStatement + "; " +
formattedSelectAfterInsertStatement,
- updateStatement -> {
- completeUpdatePreparedStatement(updateStatement, flowAction,
eventTimeMillis, true,
+ int numRowsUpdated =
withPreparedStatement(formattedAcquireLeaseIfFinishedStatement,
+ insertStatement -> {
+ completeUpdatePreparedStatement(insertStatement, flowAction,
true,
false, dbEventTimestamp, null);
- return updateStatement.executeQuery();
+ return insertStatement.executeUpdate();
}, true);
- return handleResultFromAttemptedLeaseObtainment(rs, flowAction,
eventTimeMillis);
+ return evaluateStatusAfterLeaseAttempt(numRowsUpdated, flowAction,
Optional.of(dbCurrentTimestamp));
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
+ protected GetEventInfoResult createGetInfoResult(ResultSet resultSet) throws
IOException {
+ try {
+ // Extract values from result set
+ Timestamp dbEventTimestamp = resultSet.getTimestamp("event_timestamp");
+ Timestamp dbLeaseAcquisitionTimestamp =
resultSet.getTimestamp("lease_acquisition_timestamp");
+ boolean withinEpsilon = resultSet.getBoolean("is_within_epsilon");
+ int leaseValidityStatus = resultSet.getInt("lease_validity_status");
+ int dbLinger = resultSet.getInt("linger");
+ Timestamp dbCurrentTimestamp =
resultSet.getTimestamp("CURRENT_TIMESTAMP");
+ return new GetEventInfoResult(dbEventTimestamp,
dbLeaseAcquisitionTimestamp, withinEpsilon, leaseValidityStatus,
+ dbLinger, dbCurrentTimestamp);
+ } catch (SQLException e) {
+ throw new IOException(e);
+ } finally {
+ if (resultSet != null) {
+ try {
+ resultSet.close();
+ } catch (SQLException e) {
+ throw new IOException(e);
+ }
+ }
+ }
+ }
+
+ protected SelectInfoResult createSelectInfoResult(ResultSet resultSet)
throws IOException {
+ try {
+ if (!resultSet.next()) {
+ log.error("Expected num rows and lease_acquisition_timestamp
returned from query but received nothing");
Review Comment:
Oh good catch, I wan't this code path to terminate so instead will through
an IO Error.
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java:
##########
@@ -219,84 +258,142 @@ else if (leaseValidityStatus == 2) {
// Use our event to acquire lease, check for previous db
eventTimestamp and leaseAcquisitionTimestamp
String formattedAcquireLeaseIfMatchingAllStatement =
String.format(CONDITIONALLY_ACQUIRE_LEASE_IF_MATCHING_ALL_COLS_STATEMENT,
this.leaseArbiterTableName);
- ResultSet rs = withPreparedStatement(
- formattedAcquireLeaseIfMatchingAllStatement + "; " +
formattedSelectAfterInsertStatement,
- updateStatement -> {
- completeUpdatePreparedStatement(updateStatement, flowAction,
eventTimeMillis, true,
+ int numRowsUpdated =
withPreparedStatement(formattedAcquireLeaseIfMatchingAllStatement,
+ insertStatement -> {
+ completeUpdatePreparedStatement(insertStatement, flowAction,
true,
true, dbEventTimestamp, dbLeaseAcquisitionTimestamp);
- return updateStatement.executeQuery();
+ return insertStatement.executeUpdate();
}, true);
- return handleResultFromAttemptedLeaseObtainment(rs, flowAction,
eventTimeMillis);
+ return evaluateStatusAfterLeaseAttempt(numRowsUpdated, flowAction,
Optional.of(dbCurrentTimestamp));
} // No longer leasing this event
- // CASE 6: Same event, no longer leasing event in db: terminate
if (isWithinEpsilon) {
+ log.debug("tryAcquireLease for [{}, eventTimestamp: {}] - CASE 5:
Same event, no longer leasing event in db: "
+ + "terminate", flowAction, dbCurrentTimestamp.getTime());
return new NoLongerLeasingStatus();
}
- // CASE 7: Distinct event, no longer leasing event in db
+ log.debug("tryAcquireLease for [{}, eventTimestamp: {}] - CASE 6:
Distinct event, no longer leasing event in "
+ + "db", flowAction, dbCurrentTimestamp.getTime());
// Use our event to acquire lease, check for previous db
eventTimestamp and NULL leaseAcquisitionTimestamp
String formattedAcquireLeaseIfFinishedStatement =
String.format(CONDITIONALLY_ACQUIRE_LEASE_IF_FINISHED_LEASING_STATEMENT,
this.leaseArbiterTableName);
- ResultSet rs = withPreparedStatement(
- formattedAcquireLeaseIfFinishedStatement + "; " +
formattedSelectAfterInsertStatement,
- updateStatement -> {
- completeUpdatePreparedStatement(updateStatement, flowAction,
eventTimeMillis, true,
+ int numRowsUpdated =
withPreparedStatement(formattedAcquireLeaseIfFinishedStatement,
+ insertStatement -> {
+ completeUpdatePreparedStatement(insertStatement, flowAction,
true,
false, dbEventTimestamp, null);
- return updateStatement.executeQuery();
+ return insertStatement.executeUpdate();
}, true);
- return handleResultFromAttemptedLeaseObtainment(rs, flowAction,
eventTimeMillis);
+ return evaluateStatusAfterLeaseAttempt(numRowsUpdated, flowAction,
Optional.of(dbCurrentTimestamp));
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
+ protected GetEventInfoResult createGetInfoResult(ResultSet resultSet) throws
IOException {
+ try {
+ // Extract values from result set
+ Timestamp dbEventTimestamp = resultSet.getTimestamp("event_timestamp");
+ Timestamp dbLeaseAcquisitionTimestamp =
resultSet.getTimestamp("lease_acquisition_timestamp");
+ boolean withinEpsilon = resultSet.getBoolean("is_within_epsilon");
+ int leaseValidityStatus = resultSet.getInt("lease_validity_status");
+ int dbLinger = resultSet.getInt("linger");
+ Timestamp dbCurrentTimestamp =
resultSet.getTimestamp("CURRENT_TIMESTAMP");
+ return new GetEventInfoResult(dbEventTimestamp,
dbLeaseAcquisitionTimestamp, withinEpsilon, leaseValidityStatus,
+ dbLinger, dbCurrentTimestamp);
+ } catch (SQLException e) {
+ throw new IOException(e);
+ } finally {
+ if (resultSet != null) {
+ try {
+ resultSet.close();
+ } catch (SQLException e) {
+ throw new IOException(e);
+ }
+ }
+ }
+ }
+
+ protected SelectInfoResult createSelectInfoResult(ResultSet resultSet)
throws IOException {
+ try {
+ if (!resultSet.next()) {
+ log.error("Expected num rows and lease_acquisition_timestamp
returned from query but received nothing");
Review Comment:
Oh good catch, I want this code path to terminate so instead will through an
IO Error.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]