phet commented on code in PR #3776: URL: https://github.com/apache/gobblin/pull/3776#discussion_r1343113772
########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagTask.java: ########## @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.orchestration.task; + +import java.io.IOException; + +import org.apache.gobblin.annotation.Alpha; +import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter; +import org.apache.gobblin.service.modules.orchestration.DagManagementStateStore; +import org.apache.gobblin.service.modules.orchestration.DagTaskVisitor; +import org.apache.gobblin.service.modules.orchestration.proc.DagProc; + + +/** + * Defines an individual task or job in a Dag. + * Upon completion of the {@link DagProc#process(DagManagementStateStore)} it will mark the lease + * acquired by {@link org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter} as complete + * @param <T> + */ + +@Alpha +public abstract class DagTask { + + protected MultiActiveLeaseArbiter.LeaseObtainedStatus leaseObtainedStatusStatus; + + /** + * Currently, conclusion of {@link DagTask} marks and records a successful release of lease. + * It is invoked after {@link DagProc#process(DagManagementStateStore)} is completed successfully. + * @param multiActiveLeaseArbiter + * @throws IOException + */ + public void conclude(MultiActiveLeaseArbiter multiActiveLeaseArbiter) throws IOException { + multiActiveLeaseArbiter.recordLeaseSuccess(leaseObtainedStatusStatus); Review Comment: doesn't this return a boolean (of whether success)? if so, be sure not to swallow the return value ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProc.java: ########## @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.orchestration.proc; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; + +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.annotation.Alpha; +import org.apache.gobblin.metrics.event.EventSubmitter; +import org.apache.gobblin.service.modules.orchestration.DagManagementStateStore; +import org.apache.gobblin.service.modules.orchestration.exception.MaybeRetryableException; +import org.apache.gobblin.service.modules.orchestration.task.DagTask; + + +/** + * Responsible to performing the actual work for a given {@link DagTask}. + * It processes the {@link DagTask} by first initializing its state, performing actions + * based on the type of {@link DagTask} and finally submitting an event to the executor. + * @param <S> current state of the dag node + * @param <R> result after processing the dag node + */ +@Alpha +@Slf4j +public abstract class DagProc<S, R> { + + abstract protected S initialize(DagManagementStateStore dagManagementStateStore) throws MaybeRetryableException, IOException; + abstract protected R act(S state, DagManagementStateStore dagManagementStateStore) throws MaybeRetryableException, Exception; + abstract protected void sendNotification(R result, EventSubmitter eventSubmitter) throws MaybeRetryableException, IOException; + + public final void process(DagManagementStateStore dagManagementStateStore, EventSubmitter eventSubmitter, int maxRetryCount, long delayRetryMillis) { + try { + S state = this.initializeWithRetries(dagManagementStateStore, maxRetryCount, delayRetryMillis); + R result = this.actWithRetries(state, dagManagementStateStore, maxRetryCount, delayRetryMillis); // may be pass state store too here Review Comment: is the comment obsolete? ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java: ########## @@ -313,4 +317,42 @@ protected static long getUTCTimeFromDelayPeriod(long delayPeriodMillis) { Date date = Date.from(localDateTime.atZone(ZoneId.of("UTC")).toInstant()); return GobblinServiceJobScheduler.utcDateAsUTCEpochMillis(date); } + + /** + * Attempts to acquire lease for a given {@link org.apache.gobblin.runtime.api.DagActionStore.DagAction} + * through lease arbitration and if it fails, it will create and schedule a reminder trigger to check back again. Review Comment: I agree with the advice! ########## gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java: ########## @@ -136,14 +144,22 @@ protected void processMessage(DecodeableKafkaRecord message) { // We only expect INSERT and DELETE operations done to this table. INSERTs correspond to any type of // {@link DagActionStore.FlowActionType} flow requests that have to be processed. DELETEs require no action. try { + if (operation.equals("INSERT")) { if (dagActionType.equals(DagActionStore.FlowActionType.RESUME)) { log.info("Received insert dag action and about to send resume flow request"); dagManager.handleResumeFlowRequest(flowGroup, flowName,Long.parseLong(flowExecutionId)); + //TODO: add a flag for if condition only if multi-active is enabled this.resumesInvoked.mark(); } else if (dagActionType.equals(DagActionStore.FlowActionType.KILL)) { log.info("Received insert dag action and about to send kill flow request"); dagManager.handleKillFlowRequest(flowGroup, flowName, Long.parseLong(flowExecutionId)); + + if(isMultiLeaderDagManagerEnabled) { + DagActionStore.DagAction killAction = new DagActionStore.DagAction(flowGroup, flowName, flowExecutionId, DagActionStore.FlowActionType.KILL); Review Comment: is `killFlow` called in any other place? if we don't actually have a `DagAction` already on hand in any of the callsites, not sure it makes sense to use that in the API. perhaps it should be: ``` killFlow(flowGroup, flowName, flowExecutionId, produceTimestamp) ``` ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagTask.java: ########## @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.orchestration.task; + +import java.io.IOException; + +import org.apache.gobblin.annotation.Alpha; +import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter; +import org.apache.gobblin.service.modules.orchestration.DagManagementStateStore; +import org.apache.gobblin.service.modules.orchestration.DagTaskVisitor; +import org.apache.gobblin.service.modules.orchestration.proc.DagProc; + + +/** + * Defines an individual task or job in a Dag. + * Upon completion of the {@link DagProc#process(DagManagementStateStore)} it will mark the lease + * acquired by {@link org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter} as complete + * @param <T> + */ + +@Alpha +public abstract class DagTask { + + protected MultiActiveLeaseArbiter.LeaseObtainedStatus leaseObtainedStatusStatus; Review Comment: why not `private final`? ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProc.java: ########## @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.orchestration.proc; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; + +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.annotation.Alpha; +import org.apache.gobblin.metrics.event.EventSubmitter; +import org.apache.gobblin.service.modules.orchestration.DagManagementStateStore; +import org.apache.gobblin.service.modules.orchestration.exception.MaybeRetryableException; +import org.apache.gobblin.service.modules.orchestration.task.DagTask; + + +/** + * Responsible to performing the actual work for a given {@link DagTask}. + * It processes the {@link DagTask} by first initializing its state, performing actions + * based on the type of {@link DagTask} and finally submitting an event to the executor. + * @param <S> current state of the dag node + * @param <R> result after processing the dag node + */ +@Alpha +@Slf4j +public abstract class DagProc<S, R> { + + abstract protected S initialize(DagManagementStateStore dagManagementStateStore) throws MaybeRetryableException, IOException; + abstract protected R act(S state, DagManagementStateStore dagManagementStateStore) throws MaybeRetryableException, Exception; + abstract protected void sendNotification(R result, EventSubmitter eventSubmitter) throws MaybeRetryableException, IOException; Review Comment: not 100% sure if this is inline w/ coding standard... but I always put `public` before `protected`, whether or not `abstract` ########## gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java: ########## @@ -94,6 +99,9 @@ public DagActionStoreChangeMonitor(String topic, Config config, DagActionStore d this.flowCatalog = flowCatalog; this.orchestrator = orchestrator; this.isMultiActiveSchedulerEnabled = isMultiActiveSchedulerEnabled; + // instantiating using default ctor; subsequent PR will handle instantiating with multi-args ctor +// this.dagTaskStream = new DagTaskStream(); Review Comment: so this isn't initialized anywhere? won't we get an NPE when using it? ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProc.java: ########## @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.orchestration.proc; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; + +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.annotation.Alpha; +import org.apache.gobblin.metrics.event.EventSubmitter; +import org.apache.gobblin.service.modules.orchestration.DagManagementStateStore; +import org.apache.gobblin.service.modules.orchestration.exception.MaybeRetryableException; +import org.apache.gobblin.service.modules.orchestration.task.DagTask; + + +/** + * Responsible to performing the actual work for a given {@link DagTask}. Review Comment: suggestion: `Encapsulates task-specific handling appropriate to a given {@link DagTask} derived type.` ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagTask.java: ########## @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.orchestration.task; + +import java.io.IOException; + +import org.apache.gobblin.annotation.Alpha; +import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter; +import org.apache.gobblin.service.modules.orchestration.DagManagementStateStore; +import org.apache.gobblin.service.modules.orchestration.DagTaskVisitor; +import org.apache.gobblin.service.modules.orchestration.proc.DagProc; + + +/** + * Defines an individual task or job in a Dag. Review Comment: "job" already has other connotations as what we run on executors. maybe "defines a singular task in the lifecycle of a managed Dag"? ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProc.java: ########## @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.orchestration.proc; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; + +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.annotation.Alpha; +import org.apache.gobblin.metrics.event.EventSubmitter; +import org.apache.gobblin.service.modules.orchestration.DagManagementStateStore; +import org.apache.gobblin.service.modules.orchestration.exception.MaybeRetryableException; +import org.apache.gobblin.service.modules.orchestration.task.DagTask; + + +/** + * Responsible to performing the actual work for a given {@link DagTask}. + * It processes the {@link DagTask} by first initializing its state, performing actions + * based on the type of {@link DagTask} and finally submitting an event to the executor. + * @param <S> current state of the dag node + * @param <R> result after processing the dag node + */ +@Alpha +@Slf4j +public abstract class DagProc<S, R> { + + abstract protected S initialize(DagManagementStateStore dagManagementStateStore) throws MaybeRetryableException, IOException; + abstract protected R act(S state, DagManagementStateStore dagManagementStateStore) throws MaybeRetryableException, Exception; + abstract protected void sendNotification(R result, EventSubmitter eventSubmitter) throws MaybeRetryableException, IOException; + + public final void process(DagManagementStateStore dagManagementStateStore, EventSubmitter eventSubmitter, int maxRetryCount, long delayRetryMillis) { + try { + S state = this.initializeWithRetries(dagManagementStateStore, maxRetryCount, delayRetryMillis); + R result = this.actWithRetries(state, dagManagementStateStore, maxRetryCount, delayRetryMillis); // may be pass state store too here + this.sendNotificationWithRetries(result, eventSubmitter, maxRetryCount, delayRetryMillis); + log.info("Successfully processed Dag Request"); + } catch (Exception ex) { + throw new RuntimeException("Cannot process Dag Request: ", ex); + } + } + + protected final S initializeWithRetries(DagManagementStateStore dagManagementStateStore, int maxRetryCount, long delayRetryMillis) throws IOException { + for (int retryCount = 0; retryCount < maxRetryCount; retryCount++) { + try { + return this.initialize(dagManagementStateStore); + } catch (MaybeRetryableException e) { + if (retryCount < maxRetryCount - 1) { // Don't wait before the last retry + waitBeforeRetry(delayRetryMillis); + } + } catch (IOException ex) { + throw new RuntimeException(ex); + } + } + throw new RuntimeException("Max retry attempts reached. Cannot initialize Dag"); + } Review Comment: 1. try reworking more generically as: ``` protected final <T> T execWithRetries(Supplier<T> exec, int maxRetryCount, long delayRetryMillis) { ... } ``` and call like: ``` R result = this.execWithRetries(() -> this.act(state, dagManagementStateStore), maxRetryCount, delayRetryMillis) ``` 2. probably don't wrap exceptions, given that's what `process` already plans to do... unless you're merely trying to tunnel them past the `Supplier`'s exception signature. if so, then create a `CheckedExceptionSupplier` in the vein of - https://github.com/apache/gobblin/blob/028b85f587e3c1e6afa5d8662fe9ed3f0087568d/gobblin-utility/src/main/java/org/apache/gobblin/util/function/CheckedExceptionFunction.java#L31 3. the final missing piece is to remain aware of the timeframe for the task's lease expiration. when it's close (or already passed) do not try (and certainly don't retry). instead throw a particular exception to mark the situation ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java: ########## @@ -311,4 +315,40 @@ protected static long getUTCTimeFromDelayPeriod(long delayPeriodMillis) { Date date = Date.from(localDateTime.atZone(ZoneId.of("UTC")).toInstant()); return GobblinServiceJobScheduler.utcDateAsUTCEpochMillis(date); } + + /** + * Attempts to acquire lease for a given {@link org.apache.gobblin.runtime.api.DagActionStore.DagAction} + * through lease arbitration and if it fails, it will create and schedule a reminder trigger to check back again. + * @param jobProps + * @param flowAction + * @param eventTimeMillis + * @return optionally leaseObtainedStatus if acquired; otherwise schedule reminder to check back again. + * @throws IOException + */ + public MultiActiveLeaseArbiter.LeaseAttemptStatus getLeaseOnDagAction(Properties jobProps, DagActionStore.DagAction flowAction, long eventTimeMillis) throws IOException { + + if (multiActiveLeaseArbiter.isPresent()) { + MultiActiveLeaseArbiter.LeaseAttemptStatus leaseAttemptStatus = multiActiveLeaseArbiter.get().tryAcquireLease(flowAction, eventTimeMillis); + if (leaseAttemptStatus instanceof MultiActiveLeaseArbiter.LeaseObtainedStatus) { + this.leaseObtainedCount.inc(); + log.info("Successfully acquired lease for dag action: {}", flowAction); + } else if (leaseAttemptStatus instanceof MultiActiveLeaseArbiter.LeasedToAnotherStatus) { + this.leasedToAnotherStatusCount.inc(); + scheduleReminderForEvent(jobProps, + (MultiActiveLeaseArbiter.LeasedToAnotherStatus) leaseAttemptStatus, eventTimeMillis); + } else if (leaseAttemptStatus instanceof MultiActiveLeaseArbiter.NoLongerLeasingStatus) { + this.noLongerLeasingStatusCount.inc(); + log.info("Received type of leaseAttemptStatus: [{}, eventTimestamp: {}] ", leaseAttemptStatus.getClass().getName(), + eventTimeMillis); + } + return leaseAttemptStatus; + } else { + throw new RuntimeException(String.format("Multi-active scheduler is not enabled so trigger event should not be " + + "handled with this method.")); + } + } + + public MultiActiveLeaseArbiter getMultiActiveLeaseArbiter() { + return this.multiActiveLeaseArbiter.get(); + } Review Comment: no need for this, if you indeed modify the existing `handleTriggerEvent` on line 109 ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProc.java: ########## @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.orchestration.proc; + +import java.io.IOException; +import java.net.URI; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +import com.google.common.collect.Maps; + +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.annotation.Alpha; +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.metrics.event.EventSubmitter; +import org.apache.gobblin.metrics.event.TimingEvent; +import org.apache.gobblin.runtime.api.DagActionStore; +import org.apache.gobblin.service.modules.flowgraph.Dag; +import org.apache.gobblin.service.modules.orchestration.DagManagementStateStore; +import org.apache.gobblin.service.modules.orchestration.DagManagerUtils; +import org.apache.gobblin.service.modules.orchestration.TimingEventUtils; +import org.apache.gobblin.service.modules.orchestration.exception.MaybeRetryableException; +import org.apache.gobblin.service.modules.orchestration.task.DagTask; +import org.apache.gobblin.service.modules.orchestration.task.KillDagTask; +import org.apache.gobblin.service.modules.spec.JobExecutionPlan; + +import static org.apache.gobblin.service.ExecutionStatus.CANCELLED; + + +/** + * An implementation of {@link DagProc} for killing {@link DagTask}. + */ +@Slf4j +@Alpha +public final class KillDagProc extends DagProc<List<Dag.DagNode<JobExecutionPlan>>, Dag<JobExecutionPlan>> { + + private KillDagTask killDagTask; + + public KillDagProc(KillDagTask killDagTask) { + this.killDagTask = killDagTask; + } Review Comment: tip: mark this `final` and implement via `@RequiredArgsConstructor` ########## gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java: ########## @@ -80,6 +83,8 @@ public String load(String key) throws Exception { protected Orchestrator orchestrator; protected boolean isMultiActiveSchedulerEnabled; protected FlowCatalog flowCatalog; + private DagTaskStream dagTaskStream; Review Comment: this change monitor shouldn't use a task stream as such (that's exclusively for the `DagProcessingEngine`.) instead let it write to the `DagManagement` API ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProc.java: ########## @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.orchestration.proc; + +import java.io.IOException; +import java.net.URI; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +import com.google.common.collect.Maps; + +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.annotation.Alpha; +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.metrics.event.EventSubmitter; +import org.apache.gobblin.metrics.event.TimingEvent; +import org.apache.gobblin.runtime.api.DagActionStore; +import org.apache.gobblin.service.modules.flowgraph.Dag; +import org.apache.gobblin.service.modules.orchestration.DagManagementStateStore; +import org.apache.gobblin.service.modules.orchestration.DagManagerUtils; +import org.apache.gobblin.service.modules.orchestration.TimingEventUtils; +import org.apache.gobblin.service.modules.orchestration.exception.MaybeRetryableException; +import org.apache.gobblin.service.modules.orchestration.task.DagTask; +import org.apache.gobblin.service.modules.orchestration.task.KillDagTask; +import org.apache.gobblin.service.modules.spec.JobExecutionPlan; + +import static org.apache.gobblin.service.ExecutionStatus.CANCELLED; + + +/** + * An implementation of {@link DagProc} for killing {@link DagTask}. + */ +@Slf4j +@Alpha +public final class KillDagProc extends DagProc<List<Dag.DagNode<JobExecutionPlan>>, Dag<JobExecutionPlan>> { + + private KillDagTask killDagTask; + + public KillDagProc(KillDagTask killDagTask) { + this.killDagTask = killDagTask; + } + + @Override + protected List<Dag.DagNode<JobExecutionPlan>> initialize(DagManagementStateStore dagManagementStateStore) throws IOException { + String dagToCancel = this.killDagTask.getKillDagId().toString(); Review Comment: nit: since it's a `String`, not a dag, I'd name `dagId` or `dagIdToCancel` anyway, for such a short impl, (unless you're planning to log it), I'd just make a one-liner w/o an intermediate var ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProc.java: ########## @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.orchestration.proc; + +import java.io.IOException; +import java.net.URI; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +import com.google.common.collect.Maps; + +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.annotation.Alpha; +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.metrics.event.EventSubmitter; +import org.apache.gobblin.metrics.event.TimingEvent; +import org.apache.gobblin.runtime.api.DagActionStore; +import org.apache.gobblin.service.modules.flowgraph.Dag; +import org.apache.gobblin.service.modules.orchestration.DagManagementStateStore; +import org.apache.gobblin.service.modules.orchestration.DagManagerUtils; +import org.apache.gobblin.service.modules.orchestration.TimingEventUtils; +import org.apache.gobblin.service.modules.orchestration.exception.MaybeRetryableException; +import org.apache.gobblin.service.modules.orchestration.task.DagTask; +import org.apache.gobblin.service.modules.orchestration.task.KillDagTask; +import org.apache.gobblin.service.modules.spec.JobExecutionPlan; + +import static org.apache.gobblin.service.ExecutionStatus.CANCELLED; + + +/** + * An implementation of {@link DagProc} for killing {@link DagTask}. + */ +@Slf4j +@Alpha +public final class KillDagProc extends DagProc<List<Dag.DagNode<JobExecutionPlan>>, Dag<JobExecutionPlan>> { + + private KillDagTask killDagTask; + + public KillDagProc(KillDagTask killDagTask) { + this.killDagTask = killDagTask; + } + + @Override + protected List<Dag.DagNode<JobExecutionPlan>> initialize(DagManagementStateStore dagManagementStateStore) throws IOException { + String dagToCancel = this.killDagTask.getKillDagId().toString(); + return dagManagementStateStore.getJobs(dagToCancel); + } + + /** + * Post initialization of the Dag with the current state, it will identify the {@link Dag.DagNode}s to be killed + * and cancel the job on the executor. The return type is kept as {@link Object} since we might want to refactor + * or add more responsibility as part of the actions taken. Hence, after completing all possible scenarios, + * it will make sense to update the method signature with its appropriate type. + * @param dagNodesToCancel + * @return + * @throws InterruptedException + * @throws ExecutionException + * @throws IOException + */ + @Override + protected Dag<JobExecutionPlan> act(List<Dag.DagNode<JobExecutionPlan>> dagNodesToCancel, DagManagementStateStore dagManagementStateStore) throws Exception { + String dagToCancel = this.killDagTask.getKillDagId().toString(); + + log.info("Found {} DagNodes to cancel.", dagNodesToCancel.size()); + for (Dag.DagNode<JobExecutionPlan> dagNodeToCancel : dagNodesToCancel) { + killDagNode(dagNodeToCancel); + } + dagManagementStateStore.getDag(dagToCancel).setFlowEvent(TimingEvent.FlowTimings.FLOW_CANCELLED); + dagManagementStateStore.getDag(dagToCancel).setMessage("Flow killed by request"); + dagManagementStateStore.removeDagActionFromStore(this.killDagTask.getKillDagId(), DagActionStore.FlowActionType.KILL); + return dagManagementStateStore.getDag(dagToCancel); + + } + + @Override + protected void sendNotification(Dag<JobExecutionPlan> dag, EventSubmitter eventSubmitter) throws MaybeRetryableException { + for(Dag.DagNode<JobExecutionPlan> dagNodeToCancel : dag.getNodes()) { Review Comment: please update your IDE to catch when you forget the space between `for (`... this happens all over this PR ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProc.java: ########## @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.orchestration.proc; + +import java.io.IOException; +import java.net.URI; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +import com.google.common.collect.Maps; + +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.annotation.Alpha; +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.metrics.event.EventSubmitter; +import org.apache.gobblin.metrics.event.TimingEvent; +import org.apache.gobblin.runtime.api.DagActionStore; +import org.apache.gobblin.service.modules.flowgraph.Dag; +import org.apache.gobblin.service.modules.orchestration.DagManagementStateStore; +import org.apache.gobblin.service.modules.orchestration.DagManagerUtils; +import org.apache.gobblin.service.modules.orchestration.TimingEventUtils; +import org.apache.gobblin.service.modules.orchestration.exception.MaybeRetryableException; +import org.apache.gobblin.service.modules.orchestration.task.DagTask; +import org.apache.gobblin.service.modules.orchestration.task.KillDagTask; +import org.apache.gobblin.service.modules.spec.JobExecutionPlan; + +import static org.apache.gobblin.service.ExecutionStatus.CANCELLED; + + +/** + * An implementation of {@link DagProc} for killing {@link DagTask}. + */ +@Slf4j +@Alpha +public final class KillDagProc extends DagProc<List<Dag.DagNode<JobExecutionPlan>>, Dag<JobExecutionPlan>> { + + private KillDagTask killDagTask; + + public KillDagProc(KillDagTask killDagTask) { + this.killDagTask = killDagTask; + } + + @Override + protected List<Dag.DagNode<JobExecutionPlan>> initialize(DagManagementStateStore dagManagementStateStore) throws IOException { + String dagToCancel = this.killDagTask.getKillDagId().toString(); + return dagManagementStateStore.getJobs(dagToCancel); + } + + /** + * Post initialization of the Dag with the current state, it will identify the {@link Dag.DagNode}s to be killed + * and cancel the job on the executor. The return type is kept as {@link Object} since we might want to refactor + * or add more responsibility as part of the actions taken. Hence, after completing all possible scenarios, + * it will make sense to update the method signature with its appropriate type. + * @param dagNodesToCancel + * @return + * @throws InterruptedException + * @throws ExecutionException + * @throws IOException + */ + @Override + protected Dag<JobExecutionPlan> act(List<Dag.DagNode<JobExecutionPlan>> dagNodesToCancel, DagManagementStateStore dagManagementStateStore) throws Exception { + String dagToCancel = this.killDagTask.getKillDagId().toString(); + + log.info("Found {} DagNodes to cancel.", dagNodesToCancel.size()); + for (Dag.DagNode<JobExecutionPlan> dagNodeToCancel : dagNodesToCancel) { + killDagNode(dagNodeToCancel); + } + dagManagementStateStore.getDag(dagToCancel).setFlowEvent(TimingEvent.FlowTimings.FLOW_CANCELLED); + dagManagementStateStore.getDag(dagToCancel).setMessage("Flow killed by request"); + dagManagementStateStore.removeDagActionFromStore(this.killDagTask.getKillDagId(), DagActionStore.FlowActionType.KILL); Review Comment: nits: 1. move `dagToCancel` init after the `for` loop (BTW, it's a dag ID, not a DAG) 2. call `dagMgmtStateStore.getDag(dagToCancel)` only once 3. decide whether still the need for an intermediate `dagToCancel` var ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProc.java: ########## @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.orchestration.proc; + +import java.io.IOException; +import java.net.URI; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +import com.google.common.collect.Maps; + +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.annotation.Alpha; +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.metrics.event.EventSubmitter; +import org.apache.gobblin.metrics.event.TimingEvent; +import org.apache.gobblin.runtime.api.DagActionStore; +import org.apache.gobblin.service.modules.flowgraph.Dag; +import org.apache.gobblin.service.modules.orchestration.DagManagementStateStore; +import org.apache.gobblin.service.modules.orchestration.DagManagerUtils; +import org.apache.gobblin.service.modules.orchestration.TimingEventUtils; +import org.apache.gobblin.service.modules.orchestration.exception.MaybeRetryableException; +import org.apache.gobblin.service.modules.orchestration.task.DagTask; +import org.apache.gobblin.service.modules.orchestration.task.KillDagTask; +import org.apache.gobblin.service.modules.spec.JobExecutionPlan; + +import static org.apache.gobblin.service.ExecutionStatus.CANCELLED; + + +/** + * An implementation of {@link DagProc} for killing {@link DagTask}. + */ +@Slf4j +@Alpha +public final class KillDagProc extends DagProc<List<Dag.DagNode<JobExecutionPlan>>, Dag<JobExecutionPlan>> { + + private KillDagTask killDagTask; + + public KillDagProc(KillDagTask killDagTask) { + this.killDagTask = killDagTask; + } + + @Override + protected List<Dag.DagNode<JobExecutionPlan>> initialize(DagManagementStateStore dagManagementStateStore) throws IOException { + String dagToCancel = this.killDagTask.getKillDagId().toString(); + return dagManagementStateStore.getJobs(dagToCancel); + } + + /** + * Post initialization of the Dag with the current state, it will identify the {@link Dag.DagNode}s to be killed + * and cancel the job on the executor. The return type is kept as {@link Object} since we might want to refactor + * or add more responsibility as part of the actions taken. Hence, after completing all possible scenarios, + * it will make sense to update the method signature with its appropriate type. Review Comment: out of date -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
