[
https://issues.apache.org/jira/browse/GOBBLIN-1910?focusedWorklogId=882991&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-882991
]
ASF GitHub Bot logged work on GOBBLIN-1910:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 02/Oct/23 20:58
Start Date: 02/Oct/23 20:58
Worklog Time Spent: 10m
Work Description: phet commented on code in PR #3776:
URL: https://github.com/apache/gobblin/pull/3776#discussion_r1343113772
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagTask.java:
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.task;
+
+import java.io.IOException;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import org.apache.gobblin.service.modules.orchestration.DagTaskVisitor;
+import org.apache.gobblin.service.modules.orchestration.proc.DagProc;
+
+
+/**
+ * Defines an individual task or job in a Dag.
+ * Upon completion of the {@link DagProc#process(DagManagementStateStore)} it
will mark the lease
+ * acquired by {@link org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter}
as complete
+ * @param <T>
+ */
+
+@Alpha
+public abstract class DagTask {
+
+ protected MultiActiveLeaseArbiter.LeaseObtainedStatus
leaseObtainedStatusStatus;
+
+ /**
+ * Currently, conclusion of {@link DagTask} marks and records a successful
release of lease.
+ * It is invoked after {@link DagProc#process(DagManagementStateStore)} is
completed successfully.
+ * @param multiActiveLeaseArbiter
+ * @throws IOException
+ */
+ public void conclude(MultiActiveLeaseArbiter multiActiveLeaseArbiter) throws
IOException {
+ multiActiveLeaseArbiter.recordLeaseSuccess(leaseObtainedStatusStatus);
Review Comment:
doesn't this return a boolean (of whether success)? if so, be sure not to
swallow the return value
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProc.java:
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.proc;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import
org.apache.gobblin.service.modules.orchestration.exception.MaybeRetryableException;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+
+
+/**
+ * Responsible to performing the actual work for a given {@link DagTask}.
+ * It processes the {@link DagTask} by first initializing its state,
performing actions
+ * based on the type of {@link DagTask} and finally submitting an event to the
executor.
+ * @param <S> current state of the dag node
+ * @param <R> result after processing the dag node
+ */
+@Alpha
+@Slf4j
+public abstract class DagProc<S, R> {
+
+ abstract protected S initialize(DagManagementStateStore
dagManagementStateStore) throws MaybeRetryableException, IOException;
+ abstract protected R act(S state, DagManagementStateStore
dagManagementStateStore) throws MaybeRetryableException, Exception;
+ abstract protected void sendNotification(R result, EventSubmitter
eventSubmitter) throws MaybeRetryableException, IOException;
+
+ public final void process(DagManagementStateStore dagManagementStateStore,
EventSubmitter eventSubmitter, int maxRetryCount, long delayRetryMillis) {
+ try {
+ S state = this.initializeWithRetries(dagManagementStateStore,
maxRetryCount, delayRetryMillis);
+ R result = this.actWithRetries(state, dagManagementStateStore,
maxRetryCount, delayRetryMillis); // may be pass state store too here
Review Comment:
is the comment obsolete?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java:
##########
@@ -313,4 +317,42 @@ protected static long getUTCTimeFromDelayPeriod(long
delayPeriodMillis) {
Date date = Date.from(localDateTime.atZone(ZoneId.of("UTC")).toInstant());
return GobblinServiceJobScheduler.utcDateAsUTCEpochMillis(date);
}
+
+ /**
+ * Attempts to acquire lease for a given {@link
org.apache.gobblin.runtime.api.DagActionStore.DagAction}
+ * through lease arbitration and if it fails, it will create and schedule a
reminder trigger to check back again.
Review Comment:
I agree with the advice!
##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java:
##########
@@ -136,14 +144,22 @@ protected void processMessage(DecodeableKafkaRecord
message) {
// We only expect INSERT and DELETE operations done to this table. INSERTs
correspond to any type of
// {@link DagActionStore.FlowActionType} flow requests that have to be
processed. DELETEs require no action.
try {
+
if (operation.equals("INSERT")) {
if (dagActionType.equals(DagActionStore.FlowActionType.RESUME)) {
log.info("Received insert dag action and about to send resume flow
request");
dagManager.handleResumeFlowRequest(flowGroup,
flowName,Long.parseLong(flowExecutionId));
+ //TODO: add a flag for if condition only if multi-active is enabled
this.resumesInvoked.mark();
} else if (dagActionType.equals(DagActionStore.FlowActionType.KILL)) {
log.info("Received insert dag action and about to send kill flow
request");
dagManager.handleKillFlowRequest(flowGroup, flowName,
Long.parseLong(flowExecutionId));
+
+ if(isMultiLeaderDagManagerEnabled) {
+ DagActionStore.DagAction killAction = new
DagActionStore.DagAction(flowGroup, flowName, flowExecutionId,
DagActionStore.FlowActionType.KILL);
Review Comment:
is `killFlow` called in any other place? if we don't actually have a
`DagAction` already on hand in any of the callsites, not sure it makes sense to
use that in the API. perhaps it should be:
```
killFlow(flowGroup, flowName, flowExecutionId, produceTimestamp)
```
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagTask.java:
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.task;
+
+import java.io.IOException;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import org.apache.gobblin.service.modules.orchestration.DagTaskVisitor;
+import org.apache.gobblin.service.modules.orchestration.proc.DagProc;
+
+
+/**
+ * Defines an individual task or job in a Dag.
+ * Upon completion of the {@link DagProc#process(DagManagementStateStore)} it
will mark the lease
+ * acquired by {@link org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter}
as complete
+ * @param <T>
+ */
+
+@Alpha
+public abstract class DagTask {
+
+ protected MultiActiveLeaseArbiter.LeaseObtainedStatus
leaseObtainedStatusStatus;
Review Comment:
why not `private final`?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProc.java:
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.proc;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import
org.apache.gobblin.service.modules.orchestration.exception.MaybeRetryableException;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+
+
+/**
+ * Responsible to performing the actual work for a given {@link DagTask}.
+ * It processes the {@link DagTask} by first initializing its state,
performing actions
+ * based on the type of {@link DagTask} and finally submitting an event to the
executor.
+ * @param <S> current state of the dag node
+ * @param <R> result after processing the dag node
+ */
+@Alpha
+@Slf4j
+public abstract class DagProc<S, R> {
+
+ abstract protected S initialize(DagManagementStateStore
dagManagementStateStore) throws MaybeRetryableException, IOException;
+ abstract protected R act(S state, DagManagementStateStore
dagManagementStateStore) throws MaybeRetryableException, Exception;
+ abstract protected void sendNotification(R result, EventSubmitter
eventSubmitter) throws MaybeRetryableException, IOException;
Review Comment:
not 100% sure if this is inline w/ coding standard... but I always put
`public` before `protected`, whether or not `abstract`
##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java:
##########
@@ -94,6 +99,9 @@ public DagActionStoreChangeMonitor(String topic, Config
config, DagActionStore d
this.flowCatalog = flowCatalog;
this.orchestrator = orchestrator;
this.isMultiActiveSchedulerEnabled = isMultiActiveSchedulerEnabled;
+ // instantiating using default ctor; subsequent PR will handle
instantiating with multi-args ctor
+// this.dagTaskStream = new DagTaskStream();
Review Comment:
so this isn't initialized anywhere? won't we get an NPE when using it?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProc.java:
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.proc;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import
org.apache.gobblin.service.modules.orchestration.exception.MaybeRetryableException;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+
+
+/**
+ * Responsible to performing the actual work for a given {@link DagTask}.
Review Comment:
suggestion: `Encapsulates task-specific handling appropriate to a given
{@link DagTask} derived type.`
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagTask.java:
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.task;
+
+import java.io.IOException;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import org.apache.gobblin.service.modules.orchestration.DagTaskVisitor;
+import org.apache.gobblin.service.modules.orchestration.proc.DagProc;
+
+
+/**
+ * Defines an individual task or job in a Dag.
Review Comment:
"job" already has other connotations as what we run on executors. maybe
"defines a singular task in the lifecycle of a managed Dag"?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProc.java:
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.proc;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import
org.apache.gobblin.service.modules.orchestration.exception.MaybeRetryableException;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+
+
+/**
+ * Responsible to performing the actual work for a given {@link DagTask}.
+ * It processes the {@link DagTask} by first initializing its state,
performing actions
+ * based on the type of {@link DagTask} and finally submitting an event to the
executor.
+ * @param <S> current state of the dag node
+ * @param <R> result after processing the dag node
+ */
+@Alpha
+@Slf4j
+public abstract class DagProc<S, R> {
+
+ abstract protected S initialize(DagManagementStateStore
dagManagementStateStore) throws MaybeRetryableException, IOException;
+ abstract protected R act(S state, DagManagementStateStore
dagManagementStateStore) throws MaybeRetryableException, Exception;
+ abstract protected void sendNotification(R result, EventSubmitter
eventSubmitter) throws MaybeRetryableException, IOException;
+
+ public final void process(DagManagementStateStore dagManagementStateStore,
EventSubmitter eventSubmitter, int maxRetryCount, long delayRetryMillis) {
+ try {
+ S state = this.initializeWithRetries(dagManagementStateStore,
maxRetryCount, delayRetryMillis);
+ R result = this.actWithRetries(state, dagManagementStateStore,
maxRetryCount, delayRetryMillis); // may be pass state store too here
+ this.sendNotificationWithRetries(result, eventSubmitter, maxRetryCount,
delayRetryMillis);
+ log.info("Successfully processed Dag Request");
+ } catch (Exception ex) {
+ throw new RuntimeException("Cannot process Dag Request: ", ex);
+ }
+ }
+
+ protected final S initializeWithRetries(DagManagementStateStore
dagManagementStateStore, int maxRetryCount, long delayRetryMillis) throws
IOException {
+ for (int retryCount = 0; retryCount < maxRetryCount; retryCount++) {
+ try {
+ return this.initialize(dagManagementStateStore);
+ } catch (MaybeRetryableException e) {
+ if (retryCount < maxRetryCount - 1) { // Don't wait before the last
retry
+ waitBeforeRetry(delayRetryMillis);
+ }
+ } catch (IOException ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+ throw new RuntimeException("Max retry attempts reached. Cannot initialize
Dag");
+ }
Review Comment:
1. try reworking more generically as:
```
protected final <T> T execWithRetries(Supplier<T> exec, int maxRetryCount,
long delayRetryMillis) {
...
}
```
and call like:
```
R result = this.execWithRetries(() -> this.act(state,
dagManagementStateStore), maxRetryCount, delayRetryMillis)
```
2. probably don't wrap exceptions, given that's what `process` already plans
to do... unless you're merely trying to tunnel them past the `Supplier`'s
exception signature. if so, then create a `CheckedExceptionSupplier` in the
vein of -
https://github.com/apache/gobblin/blob/028b85f587e3c1e6afa5d8662fe9ed3f0087568d/gobblin-utility/src/main/java/org/apache/gobblin/util/function/CheckedExceptionFunction.java#L31
3. the final missing piece is to remain aware of the timeframe for the
task's lease expiration. when it's close (or already passed) do not try (and
certainly don't retry). instead throw a particular exception to mark the
situation
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java:
##########
@@ -311,4 +315,40 @@ protected static long getUTCTimeFromDelayPeriod(long
delayPeriodMillis) {
Date date = Date.from(localDateTime.atZone(ZoneId.of("UTC")).toInstant());
return GobblinServiceJobScheduler.utcDateAsUTCEpochMillis(date);
}
+
+ /**
+ * Attempts to acquire lease for a given {@link
org.apache.gobblin.runtime.api.DagActionStore.DagAction}
+ * through lease arbitration and if it fails, it will create and schedule a
reminder trigger to check back again.
+ * @param jobProps
+ * @param flowAction
+ * @param eventTimeMillis
+ * @return optionally leaseObtainedStatus if acquired; otherwise schedule
reminder to check back again.
+ * @throws IOException
+ */
+ public MultiActiveLeaseArbiter.LeaseAttemptStatus
getLeaseOnDagAction(Properties jobProps, DagActionStore.DagAction flowAction,
long eventTimeMillis) throws IOException {
+
+ if (multiActiveLeaseArbiter.isPresent()) {
+ MultiActiveLeaseArbiter.LeaseAttemptStatus leaseAttemptStatus =
multiActiveLeaseArbiter.get().tryAcquireLease(flowAction, eventTimeMillis);
+ if (leaseAttemptStatus instanceof
MultiActiveLeaseArbiter.LeaseObtainedStatus) {
+ this.leaseObtainedCount.inc();
+ log.info("Successfully acquired lease for dag action: {}", flowAction);
+ } else if (leaseAttemptStatus instanceof
MultiActiveLeaseArbiter.LeasedToAnotherStatus) {
+ this.leasedToAnotherStatusCount.inc();
+ scheduleReminderForEvent(jobProps,
+ (MultiActiveLeaseArbiter.LeasedToAnotherStatus)
leaseAttemptStatus, eventTimeMillis);
+ } else if (leaseAttemptStatus instanceof
MultiActiveLeaseArbiter.NoLongerLeasingStatus) {
+ this.noLongerLeasingStatusCount.inc();
+ log.info("Received type of leaseAttemptStatus: [{}, eventTimestamp:
{}] ", leaseAttemptStatus.getClass().getName(),
+ eventTimeMillis);
+ }
+ return leaseAttemptStatus;
+ } else {
+ throw new RuntimeException(String.format("Multi-active scheduler is not
enabled so trigger event should not be "
+ + "handled with this method."));
+ }
+ }
+
+ public MultiActiveLeaseArbiter getMultiActiveLeaseArbiter() {
+ return this.multiActiveLeaseArbiter.get();
+ }
Review Comment:
no need for this, if you indeed modify the existing `handleTriggerEvent` on
line 109
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProc.java:
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.proc;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import com.google.common.collect.Maps;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
+import org.apache.gobblin.service.modules.orchestration.TimingEventUtils;
+import
org.apache.gobblin.service.modules.orchestration.exception.MaybeRetryableException;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.service.modules.orchestration.task.KillDagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+import static org.apache.gobblin.service.ExecutionStatus.CANCELLED;
+
+
+/**
+ * An implementation of {@link DagProc} for killing {@link DagTask}.
+ */
+@Slf4j
+@Alpha
+public final class KillDagProc extends
DagProc<List<Dag.DagNode<JobExecutionPlan>>, Dag<JobExecutionPlan>> {
+
+ private KillDagTask killDagTask;
+
+ public KillDagProc(KillDagTask killDagTask) {
+ this.killDagTask = killDagTask;
+ }
Review Comment:
tip: mark this `final` and implement via `@RequiredArgsConstructor`
##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java:
##########
@@ -80,6 +83,8 @@ public String load(String key) throws Exception {
protected Orchestrator orchestrator;
protected boolean isMultiActiveSchedulerEnabled;
protected FlowCatalog flowCatalog;
+ private DagTaskStream dagTaskStream;
Review Comment:
this change monitor shouldn't use a task stream as such (that's exclusively
for the `DagProcessingEngine`.) instead let it write to the `DagManagement` API
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProc.java:
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.proc;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import com.google.common.collect.Maps;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
+import org.apache.gobblin.service.modules.orchestration.TimingEventUtils;
+import
org.apache.gobblin.service.modules.orchestration.exception.MaybeRetryableException;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.service.modules.orchestration.task.KillDagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+import static org.apache.gobblin.service.ExecutionStatus.CANCELLED;
+
+
+/**
+ * An implementation of {@link DagProc} for killing {@link DagTask}.
+ */
+@Slf4j
+@Alpha
+public final class KillDagProc extends
DagProc<List<Dag.DagNode<JobExecutionPlan>>, Dag<JobExecutionPlan>> {
+
+ private KillDagTask killDagTask;
+
+ public KillDagProc(KillDagTask killDagTask) {
+ this.killDagTask = killDagTask;
+ }
+
+ @Override
+ protected List<Dag.DagNode<JobExecutionPlan>>
initialize(DagManagementStateStore dagManagementStateStore) throws IOException {
+ String dagToCancel = this.killDagTask.getKillDagId().toString();
Review Comment:
nit: since it's a `String`, not a dag, I'd name `dagId` or `dagIdToCancel`
anyway, for such a short impl, (unless you're planning to log it), I'd just
make a one-liner w/o an intermediate var
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProc.java:
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.proc;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import com.google.common.collect.Maps;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
+import org.apache.gobblin.service.modules.orchestration.TimingEventUtils;
+import
org.apache.gobblin.service.modules.orchestration.exception.MaybeRetryableException;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.service.modules.orchestration.task.KillDagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+import static org.apache.gobblin.service.ExecutionStatus.CANCELLED;
+
+
+/**
+ * An implementation of {@link DagProc} for killing {@link DagTask}.
+ */
+@Slf4j
+@Alpha
+public final class KillDagProc extends
DagProc<List<Dag.DagNode<JobExecutionPlan>>, Dag<JobExecutionPlan>> {
+
+ private KillDagTask killDagTask;
+
+ public KillDagProc(KillDagTask killDagTask) {
+ this.killDagTask = killDagTask;
+ }
+
+ @Override
+ protected List<Dag.DagNode<JobExecutionPlan>>
initialize(DagManagementStateStore dagManagementStateStore) throws IOException {
+ String dagToCancel = this.killDagTask.getKillDagId().toString();
+ return dagManagementStateStore.getJobs(dagToCancel);
+ }
+
+ /**
+ * Post initialization of the Dag with the current state, it will identify
the {@link Dag.DagNode}s to be killed
+ * and cancel the job on the executor. The return type is kept as {@link
Object} since we might want to refactor
+ * or add more responsibility as part of the actions taken. Hence, after
completing all possible scenarios,
+ * it will make sense to update the method signature with its appropriate
type.
+ * @param dagNodesToCancel
+ * @return
+ * @throws InterruptedException
+ * @throws ExecutionException
+ * @throws IOException
+ */
+ @Override
+ protected Dag<JobExecutionPlan> act(List<Dag.DagNode<JobExecutionPlan>>
dagNodesToCancel, DagManagementStateStore dagManagementStateStore) throws
Exception {
+ String dagToCancel = this.killDagTask.getKillDagId().toString();
+
+ log.info("Found {} DagNodes to cancel.", dagNodesToCancel.size());
+ for (Dag.DagNode<JobExecutionPlan> dagNodeToCancel : dagNodesToCancel) {
+ killDagNode(dagNodeToCancel);
+ }
+
dagManagementStateStore.getDag(dagToCancel).setFlowEvent(TimingEvent.FlowTimings.FLOW_CANCELLED);
+ dagManagementStateStore.getDag(dagToCancel).setMessage("Flow killed by
request");
+
dagManagementStateStore.removeDagActionFromStore(this.killDagTask.getKillDagId(),
DagActionStore.FlowActionType.KILL);
+ return dagManagementStateStore.getDag(dagToCancel);
+
+ }
+
+ @Override
+ protected void sendNotification(Dag<JobExecutionPlan> dag, EventSubmitter
eventSubmitter) throws MaybeRetryableException {
+ for(Dag.DagNode<JobExecutionPlan> dagNodeToCancel : dag.getNodes()) {
Review Comment:
please update your IDE to catch when you forget the space between `for (`...
this happens all over this PR
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProc.java:
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.proc;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import com.google.common.collect.Maps;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
+import org.apache.gobblin.service.modules.orchestration.TimingEventUtils;
+import
org.apache.gobblin.service.modules.orchestration.exception.MaybeRetryableException;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.service.modules.orchestration.task.KillDagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+import static org.apache.gobblin.service.ExecutionStatus.CANCELLED;
+
+
+/**
+ * An implementation of {@link DagProc} for killing {@link DagTask}.
+ */
+@Slf4j
+@Alpha
+public final class KillDagProc extends
DagProc<List<Dag.DagNode<JobExecutionPlan>>, Dag<JobExecutionPlan>> {
+
+ private KillDagTask killDagTask;
+
+ public KillDagProc(KillDagTask killDagTask) {
+ this.killDagTask = killDagTask;
+ }
+
+ @Override
+ protected List<Dag.DagNode<JobExecutionPlan>>
initialize(DagManagementStateStore dagManagementStateStore) throws IOException {
+ String dagToCancel = this.killDagTask.getKillDagId().toString();
+ return dagManagementStateStore.getJobs(dagToCancel);
+ }
+
+ /**
+ * Post initialization of the Dag with the current state, it will identify
the {@link Dag.DagNode}s to be killed
+ * and cancel the job on the executor. The return type is kept as {@link
Object} since we might want to refactor
+ * or add more responsibility as part of the actions taken. Hence, after
completing all possible scenarios,
+ * it will make sense to update the method signature with its appropriate
type.
+ * @param dagNodesToCancel
+ * @return
+ * @throws InterruptedException
+ * @throws ExecutionException
+ * @throws IOException
+ */
+ @Override
+ protected Dag<JobExecutionPlan> act(List<Dag.DagNode<JobExecutionPlan>>
dagNodesToCancel, DagManagementStateStore dagManagementStateStore) throws
Exception {
+ String dagToCancel = this.killDagTask.getKillDagId().toString();
+
+ log.info("Found {} DagNodes to cancel.", dagNodesToCancel.size());
+ for (Dag.DagNode<JobExecutionPlan> dagNodeToCancel : dagNodesToCancel) {
+ killDagNode(dagNodeToCancel);
+ }
+
dagManagementStateStore.getDag(dagToCancel).setFlowEvent(TimingEvent.FlowTimings.FLOW_CANCELLED);
+ dagManagementStateStore.getDag(dagToCancel).setMessage("Flow killed by
request");
+
dagManagementStateStore.removeDagActionFromStore(this.killDagTask.getKillDagId(),
DagActionStore.FlowActionType.KILL);
Review Comment:
nits:
1. move `dagToCancel` init after the `for` loop (BTW, it's a dag ID, not a
DAG)
2. call `dagMgmtStateStore.getDag(dagToCancel)` only once
3. decide whether still the need for an intermediate `dagToCancel` var
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProc.java:
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.proc;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import com.google.common.collect.Maps;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
+import org.apache.gobblin.service.modules.orchestration.TimingEventUtils;
+import
org.apache.gobblin.service.modules.orchestration.exception.MaybeRetryableException;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.service.modules.orchestration.task.KillDagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+import static org.apache.gobblin.service.ExecutionStatus.CANCELLED;
+
+
+/**
+ * An implementation of {@link DagProc} for killing {@link DagTask}.
+ */
+@Slf4j
+@Alpha
+public final class KillDagProc extends
DagProc<List<Dag.DagNode<JobExecutionPlan>>, Dag<JobExecutionPlan>> {
+
+ private KillDagTask killDagTask;
+
+ public KillDagProc(KillDagTask killDagTask) {
+ this.killDagTask = killDagTask;
+ }
+
+ @Override
+ protected List<Dag.DagNode<JobExecutionPlan>>
initialize(DagManagementStateStore dagManagementStateStore) throws IOException {
+ String dagToCancel = this.killDagTask.getKillDagId().toString();
+ return dagManagementStateStore.getJobs(dagToCancel);
+ }
+
+ /**
+ * Post initialization of the Dag with the current state, it will identify
the {@link Dag.DagNode}s to be killed
+ * and cancel the job on the executor. The return type is kept as {@link
Object} since we might want to refactor
+ * or add more responsibility as part of the actions taken. Hence, after
completing all possible scenarios,
+ * it will make sense to update the method signature with its appropriate
type.
Review Comment:
out of date
Issue Time Tracking
-------------------
Worklog Id: (was: 882991)
Time Spent: 12h 10m (was: 12h)
> Refactor code to move current in-memory references to new design for REST
> calls: Launch, Resume and Kill
> --------------------------------------------------------------------------------------------------------
>
> Key: GOBBLIN-1910
> URL: https://issues.apache.org/jira/browse/GOBBLIN-1910
> Project: Apache Gobblin
> Issue Type: New Feature
> Reporter: Meeth Gala
> Priority: Major
> Time Spent: 12h 10m
> Remaining Estimate: 0h
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)