phet commented on code in PR #3715:
URL: https://github.com/apache/gobblin/pull/3715#discussion_r1260476610
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceConfiguration.java:
##########
@@ -32,6 +33,7 @@
@ToString
+@Slf4j
Review Comment:
just wondering... is the logging actually used? otherwise, how did the
prior version compile w/o either this annotation or an explicit field?
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java:
##########
@@ -54,6 +55,10 @@
* than epsilon and encapsulate executor communication latency
including retry attempts
*
* The `event_timestamp` is the time of the flow_action event request.
+ * --- Note ---
+ * We only use the participant's local event_timestamp internally to identify
the particular flow_action event, but
+ * after interacting with the database to the database utilize the
CURRENT_TIMESTAMP of the database to insert or keep
Review Comment:
looks like repeated "with/to the database"
(and true that it avoids local/db time discrepancies, but the biggest
concern is of clock drift between participants)
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java:
##########
@@ -82,39 +87,43 @@ protected interface CheckedFunction<T, R> {
private final int linger;
// TODO: define retention on this table
- private static final String CREATE_LEASE_ARBITER_TABLE_STATEMENT = "CREATE
TABLE IF NOT EXISTS %S ("
+ private static final String CREATE_LEASE_ARBITER_TABLE_STATEMENT = "CREATE
TABLE IF NOT EXISTS %s ("
+ "flow_group varchar(" + ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH + ")
NOT NULL, flow_name varchar("
+ ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH + ") NOT NULL, " +
"flow_execution_id varchar("
+ ServiceConfigKeys.MAX_FLOW_EXECUTION_ID_LENGTH + ") NOT NULL,
flow_action varchar(100) NOT NULL, "
+ "event_timestamp TIMESTAMP, "
- + "lease_acquisition_timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,"
+ + "lease_acquisition_timestamp TIMESTAMP NULL DEFAULT '1970-01-02
00:00:00', "
Review Comment:
this choice of default seems questionable... what advantage does it have
over just using `null`?
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java:
##########
@@ -137,79 +146,98 @@ public MysqlMultiActiveLeaseArbiter(Config config) throws
IOException {
this.linger = ConfigUtils.getInt(config,
ConfigurationKeys.SCHEDULER_EVENT_LINGER_MILLIS_KEY,
ConfigurationKeys.DEFAULT_SCHEDULER_EVENT_LINGER_MILLIS);
this.dataSource = MysqlDataSourceFactory.get(config,
SharedResourcesBrokerFactory.getImplicitBroker());
+ String createArbiterStatement = String.format(
+ CREATE_LEASE_ARBITER_TABLE_STATEMENT, leaseArbiterTableName);
try (Connection connection = dataSource.getConnection();
- PreparedStatement createStatement =
connection.prepareStatement(String.format(
- CREATE_LEASE_ARBITER_TABLE_STATEMENT, leaseArbiterTableName))) {
+ PreparedStatement createStatement =
connection.prepareStatement(createArbiterStatement)) {
createStatement.executeUpdate();
connection.commit();
} catch (SQLException e) {
throw new IOException("Table creation failure for " +
leaseArbiterTableName, e);
}
- withPreparedStatement(String.format(CREATE_CONSTANTS_TABLE_STATEMENT,
this.constantsTableName, this.constantsTableName),
- createStatement -> {
- int i = 0;
- createStatement.setInt(++i, epsilon);
- createStatement.setInt(++i, linger);
- return createStatement.executeUpdate();}, true);
+ String createConstantsStatement =
String.format(CREATE_CONSTANTS_TABLE_STATEMENT, this.constantsTableName);
Review Comment:
could be nice to abstract into a method, named for the high-level idea of
what's going on over these 20 or so LoC
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java:
##########
@@ -137,79 +146,98 @@ public MysqlMultiActiveLeaseArbiter(Config config) throws
IOException {
this.linger = ConfigUtils.getInt(config,
ConfigurationKeys.SCHEDULER_EVENT_LINGER_MILLIS_KEY,
ConfigurationKeys.DEFAULT_SCHEDULER_EVENT_LINGER_MILLIS);
this.dataSource = MysqlDataSourceFactory.get(config,
SharedResourcesBrokerFactory.getImplicitBroker());
+ String createArbiterStatement = String.format(
+ CREATE_LEASE_ARBITER_TABLE_STATEMENT, leaseArbiterTableName);
try (Connection connection = dataSource.getConnection();
- PreparedStatement createStatement =
connection.prepareStatement(String.format(
- CREATE_LEASE_ARBITER_TABLE_STATEMENT, leaseArbiterTableName))) {
+ PreparedStatement createStatement =
connection.prepareStatement(createArbiterStatement)) {
createStatement.executeUpdate();
connection.commit();
} catch (SQLException e) {
throw new IOException("Table creation failure for " +
leaseArbiterTableName, e);
}
- withPreparedStatement(String.format(CREATE_CONSTANTS_TABLE_STATEMENT,
this.constantsTableName, this.constantsTableName),
- createStatement -> {
- int i = 0;
- createStatement.setInt(++i, epsilon);
- createStatement.setInt(++i, linger);
- return createStatement.executeUpdate();}, true);
+ String createConstantsStatement =
String.format(CREATE_CONSTANTS_TABLE_STATEMENT, this.constantsTableName);
+ withPreparedStatement(createConstantsStatement, createStatement ->
createStatement.executeUpdate(), true);
+
+ int count = withPreparedStatement(String.format(GET_ROW_COUNT_STATEMENT,
this.constantsTableName), getStatement -> {
+ ResultSet resultSet = getStatement.executeQuery();
+ if (resultSet.next()) {
+ return resultSet.getInt(1);
+ }
+ return -1;
Review Comment:
how about `Optional<Integer>` rather than this sentinel value?
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java:
##########
@@ -219,84 +247,90 @@ else if (leaseValidityStatus == 2) {
// Use our event to acquire lease, check for previous db
eventTimestamp and leaseAcquisitionTimestamp
String formattedAcquireLeaseIfMatchingAllStatement =
String.format(CONDITIONALLY_ACQUIRE_LEASE_IF_MATCHING_ALL_COLS_STATEMENT,
this.leaseArbiterTableName);
- ResultSet rs = withPreparedStatement(
- formattedAcquireLeaseIfMatchingAllStatement + "; " +
formattedSelectAfterInsertStatement,
- updateStatement -> {
- completeUpdatePreparedStatement(updateStatement, flowAction,
eventTimeMillis, true,
+ int numRowsUpdated =
withPreparedStatement(formattedAcquireLeaseIfMatchingAllStatement,
+ insertStatement -> {
+ completeUpdatePreparedStatement(insertStatement, flowAction,
true,
true, dbEventTimestamp, dbLeaseAcquisitionTimestamp);
- return updateStatement.executeQuery();
+ return insertStatement.executeUpdate();
}, true);
- return handleResultFromAttemptedLeaseObtainment(rs, flowAction,
eventTimeMillis);
+ return evaluateStatusAfterLeaseAttempt(numRowsUpdated, flowAction);
} // No longer leasing this event
- // CASE 6: Same event, no longer leasing event in db: terminate
+ // CASE 5: Same event, no longer leasing event in db: terminate
if (isWithinEpsilon) {
return new NoLongerLeasingStatus();
}
- // CASE 7: Distinct event, no longer leasing event in db
+ // CASE 6: Distinct event, no longer leasing event in db
// Use our event to acquire lease, check for previous db
eventTimestamp and NULL leaseAcquisitionTimestamp
String formattedAcquireLeaseIfFinishedStatement =
String.format(CONDITIONALLY_ACQUIRE_LEASE_IF_FINISHED_LEASING_STATEMENT,
this.leaseArbiterTableName);
- ResultSet rs = withPreparedStatement(
- formattedAcquireLeaseIfFinishedStatement + "; " +
formattedSelectAfterInsertStatement,
- updateStatement -> {
- completeUpdatePreparedStatement(updateStatement, flowAction,
eventTimeMillis, true,
+ int numRowsUpdated =
withPreparedStatement(formattedAcquireLeaseIfFinishedStatement,
+ insertStatement -> {
+ completeUpdatePreparedStatement(insertStatement, flowAction,
true,
false, dbEventTimestamp, null);
- return updateStatement.executeQuery();
+ return insertStatement.executeUpdate();
}, true);
- return handleResultFromAttemptedLeaseObtainment(rs, flowAction,
eventTimeMillis);
+ return evaluateStatusAfterLeaseAttempt(numRowsUpdated, flowAction);
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
/**
- * Attempt lease by insert or update following a read based on the condition
the state of the table has not changed
- * since the read. Parse the result to return the corresponding status based
on successful insert/update or not.
- * @param resultSet
- * @param eventTimeMillis
- * @return LeaseAttemptStatus
+ * Parse result of attempted insert/update to obtain a lease for a
+ * {@link org.apache.gobblin.runtime.api.DagActionStore.DagAction} event by
selecting values corresponding to that
+ * event from the table to return the corresponding status based on
successful insert/update or not.
* @throws SQLException
* @throws IOException
*/
- protected LeaseAttemptStatus
handleResultFromAttemptedLeaseObtainment(ResultSet resultSet,
- DagActionStore.DagAction flowAction, long eventTimeMillis)
+ protected LeaseAttemptStatus evaluateStatusAfterLeaseAttempt(int
numRowsUpdated,
+ DagActionStore.DagAction flowAction)
throws SQLException, IOException {
- if (!resultSet.next()) {
- throw new IOException("Expected num rows and lease_acquisition_timestamp
returned from query but received nothing");
- }
- int numRowsUpdated = resultSet.getInt(1);
- long leaseAcquisitionTimeMillis = resultSet.getTimestamp(2).getTime();
- int dbLinger = resultSet.getInt(3);
+ // Fetch values in row after attempted insert
+ String formattedSelectAfterInsertStatement =
+ String.format(SELECT_AFTER_INSERT_STATEMENT,
this.leaseArbiterTableName, this.constantsTableName);
Review Comment:
wondering: is it truly necessary to bind the table name every time this
query is made? e.g. couldn't it be done once when the instance is constructed?
or might the table names ever change throughout the instance's lifetime?
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java:
##########
@@ -137,79 +146,98 @@ public MysqlMultiActiveLeaseArbiter(Config config) throws
IOException {
this.linger = ConfigUtils.getInt(config,
ConfigurationKeys.SCHEDULER_EVENT_LINGER_MILLIS_KEY,
ConfigurationKeys.DEFAULT_SCHEDULER_EVENT_LINGER_MILLIS);
this.dataSource = MysqlDataSourceFactory.get(config,
SharedResourcesBrokerFactory.getImplicitBroker());
+ String createArbiterStatement = String.format(
+ CREATE_LEASE_ARBITER_TABLE_STATEMENT, leaseArbiterTableName);
try (Connection connection = dataSource.getConnection();
- PreparedStatement createStatement =
connection.prepareStatement(String.format(
- CREATE_LEASE_ARBITER_TABLE_STATEMENT, leaseArbiterTableName))) {
+ PreparedStatement createStatement =
connection.prepareStatement(createArbiterStatement)) {
createStatement.executeUpdate();
connection.commit();
} catch (SQLException e) {
throw new IOException("Table creation failure for " +
leaseArbiterTableName, e);
}
- withPreparedStatement(String.format(CREATE_CONSTANTS_TABLE_STATEMENT,
this.constantsTableName, this.constantsTableName),
- createStatement -> {
- int i = 0;
- createStatement.setInt(++i, epsilon);
- createStatement.setInt(++i, linger);
- return createStatement.executeUpdate();}, true);
+ String createConstantsStatement =
String.format(CREATE_CONSTANTS_TABLE_STATEMENT, this.constantsTableName);
+ withPreparedStatement(createConstantsStatement, createStatement ->
createStatement.executeUpdate(), true);
+
+ int count = withPreparedStatement(String.format(GET_ROW_COUNT_STATEMENT,
this.constantsTableName), getStatement -> {
+ ResultSet resultSet = getStatement.executeQuery();
+ if (resultSet.next()) {
+ return resultSet.getInt(1);
+ }
+ return -1;
+ }, true);
+
+ // Only insert epsilon and linger values from config if this table does
not contain pre-existing values.
+ if (count == 0) {
+ String insertConstantsStatement =
String.format(INSERT_IN_CONSTANTS_TABLE_STATEMENT, this.constantsTableName);
+ withPreparedStatement(insertConstantsStatement, insertStatement -> {
+ int i = 0;
+ insertStatement.setInt(++i, epsilon);
+ insertStatement.setInt(++i, linger);
+ return insertStatement.executeUpdate();
+ }, true);
+ }
+
+ log.info("MysqlMultiActiveLeaseArbiter initialized");
}
@Override
public LeaseAttemptStatus tryAcquireLease(DagActionStore.DagAction
flowAction, long eventTimeMillis)
throws IOException {
// Check table for an existing entry for this flow action and event time
- ResultSet resultSet = withPreparedStatement(
+ GetEventInfoResult getResult = withPreparedStatement(
String.format(GET_EVENT_INFO_STATEMENT, this.leaseArbiterTableName,
this.constantsTableName),
getInfoStatement -> {
int i = 0;
- getInfoStatement.setTimestamp(++i, new Timestamp(eventTimeMillis));
getInfoStatement.setString(++i, flowAction.getFlowGroup());
getInfoStatement.setString(++i, flowAction.getFlowName());
getInfoStatement.setString(++i, flowAction.getFlowExecutionId());
getInfoStatement.setString(++i,
flowAction.getFlowActionType().toString());
- return getInfoStatement.executeQuery();
+ ResultSet resultSet = getInfoStatement.executeQuery();
+ if (!resultSet.next()) {
+ return null;
+ }
+ return new GetEventInfoResult(resultSet);
}, true);
- String formattedSelectAfterInsertStatement =
- String.format(SELECT_AFTER_INSERT_STATEMENT,
this.leaseArbiterTableName, this.constantsTableName);
try {
// CASE 1: If no existing row for this flow action, then go ahead and
insert
- if (!resultSet.next()) {
+ if (getResult == null) {
+ log.debug("CASE 1: no existing row for this flow action, then go ahead
and insert");
Review Comment:
may be worth writing only in `log.debug`, not also as a comment just above.
[update: I see other cases lack logging. I'd imagine tracing more helpful to
debugging than a source comment.]
also, I'm forgetting on the numbering... did each case receive a number
somewhere previously, which you're referencing here (e.g. in a javadoc comment)?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java:
##########
@@ -62,15 +63,15 @@
public class FlowTriggerHandler {
private final int schedulerMaxBackoffMillis;
private static Random random = new Random();
- protected MultiActiveLeaseArbiter multiActiveLeaseArbiter;
+ protected Optional<MultiActiveLeaseArbiter> multiActiveLeaseArbiter;
protected SchedulerService schedulerService;
- protected DagActionStore dagActionStore;
+ protected Optional<DagActionStore> dagActionStore;
Review Comment:
great! I strongly prefer this to `null` ;)
even so, since the two must work together, the ultimate abstraction would be
a Strategy interface w/ capability to persist the flow action and handle the
trigger event. in that approach, this ctor would pass these two optionals to a
factory to get an appropriate instance of the strategy. when both present, the
factory would give a strategy using them both, but when they're both missing, a
"null strategy" would be returned. the factory would throw if only one were
given.
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java:
##########
@@ -82,39 +87,43 @@ protected interface CheckedFunction<T, R> {
private final int linger;
// TODO: define retention on this table
- private static final String CREATE_LEASE_ARBITER_TABLE_STATEMENT = "CREATE
TABLE IF NOT EXISTS %S ("
+ private static final String CREATE_LEASE_ARBITER_TABLE_STATEMENT = "CREATE
TABLE IF NOT EXISTS %s ("
+ "flow_group varchar(" + ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH + ")
NOT NULL, flow_name varchar("
+ ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH + ") NOT NULL, " +
"flow_execution_id varchar("
+ ServiceConfigKeys.MAX_FLOW_EXECUTION_ID_LENGTH + ") NOT NULL,
flow_action varchar(100) NOT NULL, "
+ "event_timestamp TIMESTAMP, "
- + "lease_acquisition_timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,"
+ + "lease_acquisition_timestamp TIMESTAMP NULL DEFAULT '1970-01-02
00:00:00', "
+ "PRIMARY KEY (flow_group,flow_name,flow_execution_id,flow_action))";
private static final String CREATE_CONSTANTS_TABLE_STATEMENT = "CREATE TABLE
IF NOT EXISTS %s "
- + "(epsilon INT, linger INT), PRIMARY KEY (epsilon, linger); INSERT INTO
%s (epsilon, linger) VALUES (?,?)";
+ + "(epsilon INT, linger INT, PRIMARY KEY (epsilon, linger))";
+ private static final String GET_ROW_COUNT_STATEMENT = "SELECT COUNT(*) FROM
%s";
+ private static final String INSERT_IN_CONSTANTS_TABLE_STATEMENT = "INSERT
INTO %s (epsilon, linger) VALUES (?,?)";
protected static final String WHERE_CLAUSE_TO_MATCH_KEY = "WHERE
flow_group=? AND flow_name=? AND flow_execution_id=?"
+ " AND flow_action=?";
protected static final String WHERE_CLAUSE_TO_MATCH_ROW =
WHERE_CLAUSE_TO_MATCH_KEY
+ " AND event_timestamp=? AND lease_acquisition_timestamp=?";
- protected static final String SELECT_AFTER_INSERT_STATEMENT = "SELECT
ROW_COUNT() AS rows_inserted_count, "
- + "lease_acquisition_timestamp, linger FROM %s, %s " +
WHERE_CLAUSE_TO_MATCH_KEY;
+ protected static final String SELECT_AFTER_INSERT_STATEMENT = "SELECT
event_timestamp, lease_acquisition_timestamp, "
+ + "linger FROM %s, %s " + WHERE_CLAUSE_TO_MATCH_KEY;
// Does a cross join between the two tables to have epsilon and linger
values available. Returns the following values:
- // event_timestamp, lease_acquisition_timestamp, isWithinEpsilon (boolean if
event_timestamp in table is within
- // epsilon), leaseValidityStatus (1 if lease has not expired, 2 if expired,
3 if column is NULL or no longer leasing)
+ // event_timestamp, lease_acquisition_timestamp, isWithinEpsilon (boolean if
current time in db is within epsilon of
+ // event_timestamp), leaseValidityStatus (1 if lease has not expired, 2 if
expired, 3 if column is NULL or no longer
+ // leasing)
protected static final String GET_EVENT_INFO_STATEMENT = "SELECT
event_timestamp, lease_acquisition_timestamp, "
- + "abs(event_timestamp - ?) <= epsilon as isWithinEpsilon, CASE "
- + "WHEN CURRENT_TIMESTAMP < (lease_acquisition_timestamp + linger) then
1"
- + "WHEN CURRENT_TIMESTAMP >= (lease_acquisition_timestamp + linger) then
2"
- + "ELSE 3 END as leaseValidityStatus, linger FROM %s, %s " +
WHERE_CLAUSE_TO_MATCH_KEY;
+ + "TIMESTAMPDIFF(microsecond, event_timestamp, CURRENT_TIMESTAMP) / 1000
<= epsilon as isWithinEpsilon, CASE "
+ + "WHEN CURRENT_TIMESTAMP < DATE_ADD(lease_acquisition_timestamp,
INTERVAL linger*1000 MICROSECOND) then 1 "
+ + "WHEN CURRENT_TIMESTAMP >= DATE_ADD(lease_acquisition_timestamp,
INTERVAL linger*1000 MICROSECOND) then 2 "
+ + "ELSE 3 END as leaseValidityStatus, linger, CURRENT_TIMESTAMP FROM %s,
%s " + WHERE_CLAUSE_TO_MATCH_KEY;
Review Comment:
I don't know enough about sql's evaluation rules to be certain whether the
multiple invocations of `CURRENT_TIMESTAMP` all resolve to a common evaluation
or would be invoked multiple times (and if so, whether in a defined order or
not). do you know?
if this leads to multiple evaluations with no ordering guarantee, would all
logic remain internally consistent and provide an atomic view across multiple
fields?
if not, tge alternative would be to evaluate `CURRENT_TIMESTAMP` only once
in an inner (nested) query.
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java:
##########
@@ -384,8 +411,60 @@ protected <T> T withPreparedStatement(String sql,
CheckedFunction<PreparedStatem
}
return result;
} catch (SQLException e) {
- log.warn("Received SQL exception that can result from invalid
connection. Checking if validation query is set {} Exception is {}",
((HikariDataSource) this.dataSource).getConnectionTestQuery(), e);
+ log.warn("Received SQL exception that can result from invalid
connection. Checking if validation query is set {} "
+ + "Exception is {}", ((HikariDataSource)
this.dataSource).getConnectionTestQuery(), e);
throw new IOException(e);
}
}
+
+ @Data
+ /*
+ Class used to extract information from initial SELECT query resultSet to be
used for understanding the state of the
+ flow action event's lease in the arbiter store and act accordingly.
+ */
Review Comment:
minor, but
a. make this class javadoc via `/**`
b. I usually see annotations following class-level comments, not before the
comments
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java:
##########
@@ -384,8 +411,60 @@ protected <T> T withPreparedStatement(String sql,
CheckedFunction<PreparedStatem
}
return result;
} catch (SQLException e) {
- log.warn("Received SQL exception that can result from invalid
connection. Checking if validation query is set {} Exception is {}",
((HikariDataSource) this.dataSource).getConnectionTestQuery(), e);
+ log.warn("Received SQL exception that can result from invalid
connection. Checking if validation query is set {} "
+ + "Exception is {}", ((HikariDataSource)
this.dataSource).getConnectionTestQuery(), e);
throw new IOException(e);
}
}
+
+ @Data
+ /*
+ Class used to extract information from initial SELECT query resultSet to be
used for understanding the state of the
+ flow action event's lease in the arbiter store and act accordingly.
+ */
+ class GetEventInfoResult {
+ private Timestamp dbEventTimestamp;
+ private Timestamp dbLeaseAcquisitionTimestamp;
+ private boolean withinEpsilon;
+ private int leaseValidityStatus;
+ private int dbLinger;
+ private Timestamp dbCurrentTimestamp;
+
+ GetEventInfoResult(ResultSet resultSet) {
Review Comment:
nit, but rather than defining a variant constructor, this would be better as
a factory method:
```
public static fromResultSet(ResultSet rs);
```
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java:
##########
@@ -137,79 +146,98 @@ public MysqlMultiActiveLeaseArbiter(Config config) throws
IOException {
this.linger = ConfigUtils.getInt(config,
ConfigurationKeys.SCHEDULER_EVENT_LINGER_MILLIS_KEY,
ConfigurationKeys.DEFAULT_SCHEDULER_EVENT_LINGER_MILLIS);
this.dataSource = MysqlDataSourceFactory.get(config,
SharedResourcesBrokerFactory.getImplicitBroker());
+ String createArbiterStatement = String.format(
+ CREATE_LEASE_ARBITER_TABLE_STATEMENT, leaseArbiterTableName);
try (Connection connection = dataSource.getConnection();
- PreparedStatement createStatement =
connection.prepareStatement(String.format(
- CREATE_LEASE_ARBITER_TABLE_STATEMENT, leaseArbiterTableName))) {
+ PreparedStatement createStatement =
connection.prepareStatement(createArbiterStatement)) {
createStatement.executeUpdate();
connection.commit();
} catch (SQLException e) {
throw new IOException("Table creation failure for " +
leaseArbiterTableName, e);
}
- withPreparedStatement(String.format(CREATE_CONSTANTS_TABLE_STATEMENT,
this.constantsTableName, this.constantsTableName),
- createStatement -> {
- int i = 0;
- createStatement.setInt(++i, epsilon);
- createStatement.setInt(++i, linger);
- return createStatement.executeUpdate();}, true);
+ String createConstantsStatement =
String.format(CREATE_CONSTANTS_TABLE_STATEMENT, this.constantsTableName);
+ withPreparedStatement(createConstantsStatement, createStatement ->
createStatement.executeUpdate(), true);
+
+ int count = withPreparedStatement(String.format(GET_ROW_COUNT_STATEMENT,
this.constantsTableName), getStatement -> {
+ ResultSet resultSet = getStatement.executeQuery();
+ if (resultSet.next()) {
+ return resultSet.getInt(1);
+ }
+ return -1;
+ }, true);
+
+ // Only insert epsilon and linger values from config if this table does
not contain pre-existing values.
+ if (count == 0) {
+ String insertConstantsStatement =
String.format(INSERT_IN_CONSTANTS_TABLE_STATEMENT, this.constantsTableName);
+ withPreparedStatement(insertConstantsStatement, insertStatement -> {
+ int i = 0;
+ insertStatement.setInt(++i, epsilon);
+ insertStatement.setInt(++i, linger);
+ return insertStatement.executeUpdate();
+ }, true);
+ }
+
+ log.info("MysqlMultiActiveLeaseArbiter initialized");
}
@Override
public LeaseAttemptStatus tryAcquireLease(DagActionStore.DagAction
flowAction, long eventTimeMillis)
throws IOException {
// Check table for an existing entry for this flow action and event time
- ResultSet resultSet = withPreparedStatement(
+ GetEventInfoResult getResult = withPreparedStatement(
String.format(GET_EVENT_INFO_STATEMENT, this.leaseArbiterTableName,
this.constantsTableName),
getInfoStatement -> {
int i = 0;
- getInfoStatement.setTimestamp(++i, new Timestamp(eventTimeMillis));
getInfoStatement.setString(++i, flowAction.getFlowGroup());
getInfoStatement.setString(++i, flowAction.getFlowName());
getInfoStatement.setString(++i, flowAction.getFlowExecutionId());
getInfoStatement.setString(++i,
flowAction.getFlowActionType().toString());
- return getInfoStatement.executeQuery();
+ ResultSet resultSet = getInfoStatement.executeQuery();
+ if (!resultSet.next()) {
+ return null;
Review Comment:
`Optional` is nice for the compile-time reminder to explicitly consider
`null` (`Optional.absent`)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]