phet commented on code in PR #4012:
URL: https://github.com/apache/gobblin/pull/4012#discussion_r1701582523
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java:
##########
@@ -319,14 +322,30 @@ public void submitFlowToDagManager(FlowSpec flowSpec,
Dag<JobExecutionPlan> jobE
}
public void remove(Spec spec, Properties headers) throws IOException {
+ URI uri = spec.getUri();
// TODO: Evolve logic to cache and reuse previously compiled JobSpecs
// .. this will work for Identity compiler but not always for multi-hop.
// Note: Current logic assumes compilation is consistent between all
executions
if (spec instanceof FlowSpec) {
- //Send the dag to the DagManager to stop it.
- //Also send it to the SpecProducer to do any cleanup tasks on
SpecExecutor.
- _log.info("Forwarding cancel request for flow URI {} to DagManager.",
spec.getUri());
- this.dagManager.stopDag(spec.getUri());
+ String flowGroup = FlowSpec.Utils.getFlowGroup(uri);
+ String flowName = FlowSpec.Utils.getFlowName(uri);
+ if (this.flowLaunchHandler.isPresent()) {
+ List<Long> flowExecutionIds =
this.jobStatusRetriever.getLatestExecutionIdsForFlow(flowName, flowGroup, 10);
+ _log.info("Found {} flows to cancel.", flowExecutionIds.size());
+
+ for (long flowExecutionId : flowExecutionIds) {
+ DagActionStore.DagAction killDagAction =
DagActionStore.DagAction.forFlow(flowGroup, flowName, flowExecutionId,
+ DagActionStore.DagActionType.KILL);
+ DagActionStore.LeaseParams leaseObject = new
DagActionStore.LeaseParams(killDagAction, false,
Review Comment:
nit: `leaseParams` rather than `leaseObject`
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandler.java:
##########
Review Comment:
let's keep the PRs as small and as cohesive as possible. so next time, I'd
strongly encourage you to put this and related mods into a PR separate from
something like the non-transient exceptions work
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java:
##########
@@ -140,6 +153,11 @@ public void run() {
dagTask.conclude();
} catch (Exception e) {
log.error("DagProcEngineThread encountered exception while
processing dag " + dagProc.getDagId(), e);
+ if (KafkaJobStatusMonitor.isThrowableInstanceOf(e,
this.nonRetryableExceptions)) {
+ // conclude the lease so that it is not retried, if the dag proc
fails with non-transient exception
Review Comment:
suggest to log (although better would be within the predicate impl (i.e.
`Exception -> boolean`)
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java:
##########
@@ -122,6 +134,7 @@ static class DagProcEngineThread implements Runnable {
private final DagProcFactory dagProcFactory;
private final DagManagementStateStore dagManagementStateStore;
private final DagProcessingEngineMetrics dagProcEngineMetrics;
+ private final List<Class<? extends Exception>> nonRetryableExceptions;
Review Comment:
let's use a predicate as the interface - i.e. a function of `Exception ->
boolean`, but you could certainly implement one instance of that Strategy
interface in terms of a `List<Class<? extends Exception>>`
##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java:
##########
@@ -426,7 +426,15 @@ public static long getExecutionIdFromTableName(String
tableName) {
protected abstract org.apache.gobblin.configuration.State
parseJobStatus(GobblinTrackingEvent event);
- public static boolean isThrowableInstanceOf(Throwable exception,
List<Class<? extends Exception>> typesList) {
- return typesList.stream().anyMatch(e -> e.isInstance(exception));
+ public static boolean isThrowableInstanceOf(Throwable exception,
List<Class<? extends Exception>> exceptionsList) {
Review Comment:
this changes the semantics. perhaps that's OK, but a more precise name
along w/ some javadoc might help. e.g. `isThrowableOrCaseInstanceOf` or
`isThrowableInstanceOfRecursive`
also, seems better suited to living within a utils class, rather than here
in KJSM which was probably just the first place we realized we needed such
fucntionality.
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java:
##########
@@ -140,6 +153,11 @@ public void run() {
dagTask.conclude();
} catch (Exception e) {
log.error("DagProcEngineThread encountered exception while
processing dag " + dagProc.getDagId(), e);
+ if (KafkaJobStatusMonitor.isThrowableInstanceOf(e,
this.nonRetryableExceptions)) {
+ // conclude the lease so that it is not retried, if the dag proc
fails with non-transient exception
+ dagTask.conclude();
+
dagManagementStateStore.getDagManagerMetrics().dagProcessingNonRetryableExceptionMeter.mark();
Review Comment:
couldn't this live entirely encapsulated within `DagProc::process`? that
would give a normal return, which means the DPE would next call
`DagTask::conclude`
for that approach, initialize the `DagProcFactory` w/ the exception predicate
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DeadlineEnforcementDagProc.java:
##########
@@ -68,7 +68,7 @@ protected boolean
isDagStillPresent(Optional<Dag<JobExecutionPlan>> dag, DagMana
if (!dagManagementStateStore.existsJobDagAction(dagAction.getFlowGroup(),
dagAction.getFlowName(),
dagAction.getFlowExecutionId(), dagAction.getJobName(),
dagAction.getDagActionType())) {
- log.warn("Dag action {} is cleaned up from DMSS. No further action is
required.", dagAction);
+ log.info("Dag action {} is cleaned up from DMSS. No further action is
required.", dagAction);
return false;
Review Comment:
don't we already hold a lease by the time `act` is called? if so, it would
be exceedingly rare (and really no different than any other `DagProc::act`
derived class impl). hence, I'd suggest to remove. but if we really believe
we need this, let's put the impl into the `DagProc` base class
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]