phet commented on code in PR #3899:
URL: https://github.com/apache/gobblin/pull/3899#discussion_r1542171456
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java:
##########
@@ -265,7 +266,9 @@ public static GobblinServiceManager
create(GobblinServiceConfiguration serviceCo
*/
public static <T> T getClass(Class<T> classToGet) {
if (GOBBLIN_SERVICE_GUICE_MODULE == null) {
- throw new RuntimeException("getClass called without calling create
method to initialize GobblinServiceGuiceModule");
+ throw new RuntimeException(String.format("getClass called to obtain %s
without calling create method to "
+ + "initialize GobblinServiceGuiceModule. Stacktrace of current
thread %s", classToGet,
+ Arrays.toString(Thread.currentThread().getStackTrace()).replace(",
", "\n at ")));
Review Comment:
sorry if my comment was confusing, but the `RuntimeException` should
generate a stacktrace already
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java:
##########
@@ -167,7 +158,7 @@ private DagTask createDagTask(DagActionStore.DagAction
dagAction, MultiActiveLea
switch (dagActionType) {
case LAUNCH:
- return new LaunchDagTask(dagAction, leaseObtainedStatus);
+ return new LaunchDagTask(dagAction, leaseObtainedStatus,
dagActionStore.get());
Review Comment:
why `Optional` - when would the `DMTSImpl` operate w/o a `DagActionStore`?
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MultiActiveLeaseArbiterFactory.java:
##########
@@ -27,28 +27,32 @@
/**
- * An abstract base class for creating {@link MultiActiveLeaseArbiter}
factories that use a specific configuration key.
+ * An abstract base class for {@link MultiActiveLeaseArbiter} factories that
use a specific configuration key.
* Subclasses must provide a key to use in the constructor.
*/
@Slf4j
public abstract class MultiActiveLeaseArbiterFactory implements
Provider<MultiActiveLeaseArbiter> {
private final Config config;
- private final String key;
+ private final String configPrefix;
- public MultiActiveLeaseArbiterFactory(Config config, String key) {
+ public MultiActiveLeaseArbiterFactory(Config config, String configPrefix) {
this.config = Objects.requireNonNull(config);
- this.key = Objects.requireNonNull(key);
+ this.configPrefix = Objects.requireNonNull(configPrefix);
+ if (!this.config.hasPath(configPrefix)) {
+ throw new RuntimeException(String.format("Unable to initialize
multiActiveLeaseArbiter due to missing "
+ + "configurations that should be prefixed by %s.", configPrefix));
+ }
}
@Override
public MultiActiveLeaseArbiter get() {
try {
- Config leaseArbiterConfig = this.config.getConfig(key);
+ Config leaseArbiterConfig = this.config.getConfig(configPrefix);
Review Comment:
I don't see you using `this.config` anywhere, so perhaps
`leaseArbiterConfig` is what you should store in the ctor.
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/DagActionStore.java:
##########
@@ -80,6 +80,14 @@ public DagAction updateFlowExecutionId(long eventTimeMillis)
{
*/
boolean exists(String flowGroup, String flowName, String flowExecutionId,
DagActionType dagActionType) throws IOException, SQLException;
+ /**
+ * check if the dag action exists in {@link DagActionStore} using {@link
DagAction} to identify dag and specific
+ * action value
+ * @throws IOException
+ * @return true if we successfully delete one record, return false if the
record does not exist
+ */
+ boolean exists(DagAction dagAction) throws IOException, SQLException;
+
Review Comment:
I don't believe there's a legit need for this method
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagTask.java:
##########
@@ -41,25 +42,33 @@
public abstract class DagTask {
@Getter public final DagActionStore.DagAction dagAction;
private final MultiActiveLeaseArbiter.LeaseObtainedStatus
leaseObtainedStatus;
+ private final DagActionStore dagActionStore;
@Getter protected final DagManager.DagId dagId;
- public DagTask(DagActionStore.DagAction dagAction,
MultiActiveLeaseArbiter.LeaseObtainedStatus leaseObtainedStatus) {
+ public DagTask(DagActionStore.DagAction dagAction,
MultiActiveLeaseArbiter.LeaseObtainedStatus leaseObtainedStatus,
+ DagActionStore dagActionStore) {
this.dagAction = dagAction;
this.leaseObtainedStatus = leaseObtainedStatus;
+ this.dagActionStore = dagActionStore;
this.dagId = DagManagerUtils.generateDagId(dagAction.getFlowGroup(),
dagAction.getFlowName(), dagAction.getFlowExecutionId());
}
public abstract <T> T host(DagTaskVisitor<T> visitor);
/**
- * Any cleanup work, e.g. releasing lease if it was acquired earlier, may be
done in this method.
+ * Any cleanup work, including removing the dagAction from the
dagActionStore and completing the lease acquired to
+ * work on this task, is done in this method.
* Returns true if concluding dag task finished successfully otherwise false.
*/
// todo call it from the right place
public boolean conclude() {
try {
+ // If dagAction is not present in store then it has been deleted by
another participant, so we can skip deletion
+ if (this.dagActionStore.exists(this.dagAction)) {
Review Comment:
agreed: there's still a check-then-act race condition, so why not just make
`deleteDagAction` idempotent, and avoid the pre-check?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java:
##########
@@ -146,18 +141,14 @@ public DagTask next() {
private MultiActiveLeaseArbiter.LeaseAttemptStatus
retrieveLeaseStatus(DagActionStore.DagAction dagAction)
throws IOException, SchedulerException {
MultiActiveLeaseArbiter.LeaseAttemptStatus leaseAttemptStatus;
- if (!this.isMultiActiveExecutionEnabled) {
- leaseAttemptStatus = new
MultiActiveLeaseArbiter.LeaseObtainedStatus(dagAction,
System.currentTimeMillis(), Long.MAX_VALUE, null);
- } else {
- // TODO: need to handle reminder events and flag them
- leaseAttemptStatus = this.dagActionExecutionLeaseArbiter
- .tryAcquireLease(dagAction, System.currentTimeMillis(), false,
false);
- /* Schedule a reminder for the event unless the lease has been
completed to safeguard against the case where even
- we, when we might become the lease owner still fail to complete
processing
- */
- if (!(leaseAttemptStatus instanceof
MultiActiveLeaseArbiter.NoLongerLeasingStatus)) {
- scheduleReminderForEvent(leaseAttemptStatus);
- }
+ // TODO: need to handle reminder events and flag them
+ leaseAttemptStatus = this.dagActionExecutionLeaseArbiter
+ .tryAcquireLease(dagAction, System.currentTimeMillis(), false, false);
Review Comment:
`DMTSImpl` could itself take in a `Supplier<Long> currentTimeMillisClock`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]