[
https://issues.apache.org/jira/browse/GOBBLIN-1910?focusedWorklogId=881042&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-881042
]
ASF GitHub Bot logged work on GOBBLIN-1910:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 20/Sep/23 23:03
Start Date: 20/Sep/23 23:03
Worklog Time Spent: 10m
Work Description: umustafi commented on code in PR #3776:
URL: https://github.com/apache/gobblin/pull/3776#discussion_r1332232927
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagement.java:
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutionException;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.JobStatus;
+
+
+/**
+ * Responsible for defining the behavior of {@link DagTask} handling scenarios
for launch, resume, kill, job start
+ * and flow completion deadlines
+ *
+ */
+@Alpha
+public interface DagManagement {
+
+ /**
+ * Currently, it is handling just the launch of a {@link Dag} request via
REST client for adhoc flows
+ * @param flowGroup
+ * @param flowName
+ * @param triggerTimeStamp
+ */
+ void launchFlow(String flowGroup, String flowName, long triggerTimeStamp);
+
+ /**
+ * Currently, it is handling just the resume of a {@link Dag} request via
REST client for adhoc flows
Review Comment:
For this and other java docs other than `LAUNCH`, the "for adhoc flows" is
not accurate. Rather it's a resume call from REST client, but they could be
resuming a scheduled flow.
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagement.java:
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutionException;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.JobStatus;
+
+
+/**
+ * Responsible for defining the behavior of {@link DagTask} handling scenarios
for launch, resume, kill, job start
+ * and flow completion deadlines
+ *
+ */
+@Alpha
+public interface DagManagement {
+
+ /**
+ * Currently, it is handling just the launch of a {@link Dag} request via
REST client for adhoc flows
+ * @param flowGroup
+ * @param flowName
+ * @param triggerTimeStamp
+ */
+ void launchFlow(String flowGroup, String flowName, long triggerTimeStamp);
+
+ /**
+ * Currently, it is handling just the resume of a {@link Dag} request via
REST client for adhoc flows
+ * @param flowGroup
+ * @param flowName
+ * @param flowExecutionId
+ * @param triggerTimeStamp
+ * @throws IOException
+ */
+ void resumeFlow(String flowGroup, String flowName, String flowExecutionId,
long triggerTimeStamp)
+ throws IOException, InterruptedException;
+
+ /**
+ * Currently, it is handling just the kill/cancel of a {@link Dag} request
via REST client for adhoc flows
+ * @param flowGroup
+ * @param flowName
+ * @param flowExecutionId
+ * @param triggerTimeStamp
Review Comment:
add the throws exception to be consistent with above
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java:
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.util.Optional;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+import com.typesafe.config.Config;
+
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.service.modules.orchestration.processor.DagProc;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+@Alpha
+@Slf4j
+public class DagProcessingEngine {
+
+ public static final String DAG_PROCESSING_ENGINE_PREFIX =
"gobblin.service.dagProcessingEngine.";
+ public static final String NUM_THREADS_KEY = DAG_PROCESSING_ENGINE_PREFIX +
"numThreads";
+ public static final String JOB_STATUS_POLLING_INTERVAL_KEY =
DAG_PROCESSING_ENGINE_PREFIX + "pollingInterval";
+
+ private static final Integer DEFAULT_NUM_THREADS = 3;
+ private static final Integer DEFAULT_JOB_STATUS_POLLING_INTERVAL = 10;
+
+ private DagTaskStream dagTaskStream;
+ private DagProcFactory dagProcFactory;
+ private DagManagementStateStore dagManagementStateStore;
+ private ScheduledExecutorService scheduledExecutorPool;
+ private Config config;
+ private Integer numThreads;
+ private Integer pollingInterval;
+ private DagProcessingEngine.Thread [] threads;
+ private MultiActiveLeaseArbiter multiActiveLeaseArbiter;
+
+
Review Comment:
It would be very helpful to have a java doc for these classes and perhaps
diagrams or high-level summary of new classes in the PR description to really
clarify what the new refactor ur proposing is
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagement.java:
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutionException;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.JobStatus;
+
+
+/**
+ * Responsible for defining the behavior of {@link DagTask} handling scenarios
for launch, resume, kill, job start
+ * and flow completion deadlines
+ *
+ */
+@Alpha
+public interface DagManagement {
+
+ /**
+ * Currently, it is handling just the launch of a {@link Dag} request via
REST client for adhoc flows
+ * @param flowGroup
+ * @param flowName
+ * @param triggerTimeStamp
+ */
+ void launchFlow(String flowGroup, String flowName, long triggerTimeStamp);
Review Comment:
Leaving it as a note for you in case you want to document behavior in the
java doc that adhoc flows triggerTimeStamp will be "0" (see code
[here](https://github.com/apache/gobblin/blob/a28961370ffbd41e740b9e6b9238b1c9d91ae6ac/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java#L762)),
while "-1" indicates some error. All valid values should be >= 0 essentially.
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagTaskStream.java:
##########
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.LinkedBlockingDeque;
+
+import com.codahale.metrics.Timer;
+import com.google.common.base.Preconditions;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigException;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.config.ConfigBuilder;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
+import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.processor.KillDagProc;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.service.modules.orchestration.task.KillDagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import
org.apache.gobblin.service.modules.utils.FlowCompilationValidationHelper;
+import org.apache.gobblin.service.monitoring.JobStatus;
+import org.apache.gobblin.service.monitoring.JobStatusRetriever;
+
+import static org.apache.gobblin.service.ExecutionStatus.ORCHESTRATED;
+import static org.apache.gobblin.service.ExecutionStatus.valueOf;
+
+
+/**
+ * Holds a stream of {@link DagTask} that {@link DagManager} would pull from
to process, as it is ready for more work.
+ * It provides an implementation for {@link DagManagement} defines the rules
for a flow and job.
+ * Implements {@link Iterator} to provide the next {@link DagTask} if
available to {@link DagManager}
+ */
+
+@Alpha
+@Slf4j
+@AllArgsConstructor
+public class DagTaskStream implements Iterator<Optional<DagTask>>,
DagManagement {
+
+ @Getter
+ private final BlockingDeque<DagActionStore.DagAction> dagActionQueue = new
LinkedBlockingDeque<>();
+ private FlowTriggerHandler flowTriggerHandler;
+ private DagManagementStateStore dagManagementStateStore;
+ private DagManagerMetrics dagManagerMetrics;
+
+
+ @Override
+ public boolean hasNext() {
+ return true;
Review Comment:
usually `hasNext` is the one that checks queue to see if task and does all
the work in `next()` in an Iterator. Right now it returns true when that may
not be the case
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagTaskStream.java:
##########
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.LinkedBlockingDeque;
+
+import com.codahale.metrics.Timer;
+import com.google.common.base.Preconditions;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigException;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.config.ConfigBuilder;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
+import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.processor.KillDagProc;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.service.modules.orchestration.task.KillDagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import
org.apache.gobblin.service.modules.utils.FlowCompilationValidationHelper;
+import org.apache.gobblin.service.monitoring.JobStatus;
+import org.apache.gobblin.service.monitoring.JobStatusRetriever;
+
+import static org.apache.gobblin.service.ExecutionStatus.ORCHESTRATED;
+import static org.apache.gobblin.service.ExecutionStatus.valueOf;
+
+
+/**
+ * Holds a stream of {@link DagTask} that {@link DagManager} would pull from
to process, as it is ready for more work.
+ * It provides an implementation for {@link DagManagement} defines the rules
for a flow and job.
+ * Implements {@link Iterator} to provide the next {@link DagTask} if
available to {@link DagManager}
Review Comment:
what does if available mean? is this stream of tasks exclusive to this
participant or common among all?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagTaskStream.java:
##########
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.LinkedBlockingDeque;
+
+import com.codahale.metrics.Timer;
+import com.google.common.base.Preconditions;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigException;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.config.ConfigBuilder;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
+import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.processor.KillDagProc;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.service.modules.orchestration.task.KillDagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import
org.apache.gobblin.service.modules.utils.FlowCompilationValidationHelper;
+import org.apache.gobblin.service.monitoring.JobStatus;
+import org.apache.gobblin.service.monitoring.JobStatusRetriever;
+
+import static org.apache.gobblin.service.ExecutionStatus.ORCHESTRATED;
+import static org.apache.gobblin.service.ExecutionStatus.valueOf;
+
+
+/**
+ * Holds a stream of {@link DagTask} that {@link DagManager} would pull from
to process, as it is ready for more work.
+ * It provides an implementation for {@link DagManagement} defines the rules
for a flow and job.
+ * Implements {@link Iterator} to provide the next {@link DagTask} if
available to {@link DagManager}
+ */
+
+@Alpha
+@Slf4j
+@AllArgsConstructor
+public class DagTaskStream implements Iterator<Optional<DagTask>>,
DagManagement {
+
+ @Getter
+ private final BlockingDeque<DagActionStore.DagAction> dagActionQueue = new
LinkedBlockingDeque<>();
+ private FlowTriggerHandler flowTriggerHandler;
+ private DagManagementStateStore dagManagementStateStore;
+ private DagManagerMetrics dagManagerMetrics;
+
+
+ @Override
+ public boolean hasNext() {
+ return true;
+ }
+
+
+ @Override
+ public Optional<DagTask> next() {
+
+ DagActionStore.DagAction dagAction = dagActionQueue.peek();
+ try {
+ Preconditions.checkArgument(dagAction != null, "No Dag Action found in
the queue");
+ Properties jobProps = getJobProperties(dagAction);
+ Optional<MultiActiveLeaseArbiter.LeaseObtainedStatus>
leaseObtainedStatus =
+ flowTriggerHandler.getLeaseOnDagAction(jobProps, dagAction,
System.currentTimeMillis()).toJavaUtil();
+ if(leaseObtainedStatus.isPresent()) {
+ DagTask dagTask = createDagTask(dagAction, leaseObtainedStatus.get());
+ return Optional.of(dagTask);
+ }
+ } catch (Exception ex) {
+ //TODO: need to handle exceptions gracefully
+ throw new RuntimeException(ex);
+ }
+ return Optional.empty();
+ }
+
+ public boolean add(DagActionStore.DagAction dagAction) throws IOException {
+ return this.dagActionQueue.offer(dagAction);
+ }
+
+ public DagActionStore.DagAction take() {
+ return this.dagActionQueue.poll();
+ }
+
+ public DagTask createDagTask(DagActionStore.DagAction dagAction,
MultiActiveLeaseArbiter.LeaseObtainedStatus leaseObtainedStatus) {
+ DagActionStore.FlowActionType flowActionType =
dagAction.getFlowActionType();
+ switch (flowActionType) {
+ case KILL:
+ return new KillDagTask(dagAction, leaseObtainedStatus);
+ case RESUME:
+ case LAUNCH:
+ case ADVANCE:
+ default:
+ log.warn("It should not reach here. Yet to provide implementation.");
+ return null;
+ }
+ }
+
+ @Override
+ public void launchFlow(String flowGroup, String flowName, long
triggerTimeStamp) {
+ //TODO: provide implementation after finalizing code flow
+ throw new UnsupportedOperationException("Currently launch flow is not
supported.");
+ }
+
+ @Override
+ public void resumeFlow(String flowGroup, String flowName, String
flowExecutionId, long triggerTimeStamp) throws IOException,
InterruptedException {
+ //TODO: provide implementation after finalizing code flow
+ throw new UnsupportedOperationException("Currently launch flow is not
supported.");
+ }
+
+ @Override
+ public void killFlow(String flowGroup, String flowName, String
flowExecutionId, long produceTimestamp) throws IOException {
+ DagManager.DagId dagId = DagManagerUtils.generateDagId(flowGroup,
flowName, flowExecutionId);
+
if(!this.dagManagementStateStore.getDagIdToDags().containsKey(dagId.toString()))
{
+ log.info("Invalid dag since not present in map. Hence cannot cancel it");
Review Comment:
change to a warning message
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagTaskStream.java:
##########
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.LinkedBlockingDeque;
+
+import com.codahale.metrics.Timer;
+import com.google.common.base.Preconditions;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigException;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.config.ConfigBuilder;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
+import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.processor.KillDagProc;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.service.modules.orchestration.task.KillDagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import
org.apache.gobblin.service.modules.utils.FlowCompilationValidationHelper;
+import org.apache.gobblin.service.monitoring.JobStatus;
+import org.apache.gobblin.service.monitoring.JobStatusRetriever;
+
+import static org.apache.gobblin.service.ExecutionStatus.ORCHESTRATED;
+import static org.apache.gobblin.service.ExecutionStatus.valueOf;
+
+
+/**
+ * Holds a stream of {@link DagTask} that {@link DagManager} would pull from
to process, as it is ready for more work.
+ * It provides an implementation for {@link DagManagement} defines the rules
for a flow and job.
+ * Implements {@link Iterator} to provide the next {@link DagTask} if
available to {@link DagManager}
+ */
+
+@Alpha
+@Slf4j
+@AllArgsConstructor
+public class DagTaskStream implements Iterator<Optional<DagTask>>,
DagManagement {
+
+ @Getter
+ private final BlockingDeque<DagActionStore.DagAction> dagActionQueue = new
LinkedBlockingDeque<>();
+ private FlowTriggerHandler flowTriggerHandler;
+ private DagManagementStateStore dagManagementStateStore;
+ private DagManagerMetrics dagManagerMetrics;
+
+
+ @Override
+ public boolean hasNext() {
+ return true;
+ }
+
+
+ @Override
+ public Optional<DagTask> next() {
+
+ DagActionStore.DagAction dagAction = dagActionQueue.peek();
+ try {
+ Preconditions.checkArgument(dagAction != null, "No Dag Action found in
the queue");
+ Properties jobProps = getJobProperties(dagAction);
+ Optional<MultiActiveLeaseArbiter.LeaseObtainedStatus>
leaseObtainedStatus =
+ flowTriggerHandler.getLeaseOnDagAction(jobProps, dagAction,
System.currentTimeMillis()).toJavaUtil();
+ if(leaseObtainedStatus.isPresent()) {
+ DagTask dagTask = createDagTask(dagAction, leaseObtainedStatus.get());
+ return Optional.of(dagTask);
+ }
+ } catch (Exception ex) {
+ //TODO: need to handle exceptions gracefully
+ throw new RuntimeException(ex);
+ }
+ return Optional.empty();
+ }
+
+ public boolean add(DagActionStore.DagAction dagAction) throws IOException {
+ return this.dagActionQueue.offer(dagAction);
+ }
+
+ public DagActionStore.DagAction take() {
+ return this.dagActionQueue.poll();
+ }
+
+ public DagTask createDagTask(DagActionStore.DagAction dagAction,
MultiActiveLeaseArbiter.LeaseObtainedStatus leaseObtainedStatus) {
+ DagActionStore.FlowActionType flowActionType =
dagAction.getFlowActionType();
+ switch (flowActionType) {
+ case KILL:
+ return new KillDagTask(dagAction, leaseObtainedStatus);
+ case RESUME:
+ case LAUNCH:
+ case ADVANCE:
+ default:
+ log.warn("It should not reach here. Yet to provide implementation.");
+ return null;
+ }
+ }
+
+ @Override
+ public void launchFlow(String flowGroup, String flowName, long
triggerTimeStamp) {
+ //TODO: provide implementation after finalizing code flow
+ throw new UnsupportedOperationException("Currently launch flow is not
supported.");
+ }
+
+ @Override
+ public void resumeFlow(String flowGroup, String flowName, String
flowExecutionId, long triggerTimeStamp) throws IOException,
InterruptedException {
+ //TODO: provide implementation after finalizing code flow
+ throw new UnsupportedOperationException("Currently launch flow is not
supported.");
+ }
+
+ @Override
+ public void killFlow(String flowGroup, String flowName, String
flowExecutionId, long produceTimestamp) throws IOException {
Review Comment:
this is called by the `DagActionStoreMonitor` right?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagTaskStream.java:
##########
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.LinkedBlockingDeque;
+
+import com.codahale.metrics.Timer;
+import com.google.common.base.Preconditions;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigException;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.config.ConfigBuilder;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
+import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.processor.KillDagProc;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.service.modules.orchestration.task.KillDagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import
org.apache.gobblin.service.modules.utils.FlowCompilationValidationHelper;
+import org.apache.gobblin.service.monitoring.JobStatus;
+import org.apache.gobblin.service.monitoring.JobStatusRetriever;
+
+import static org.apache.gobblin.service.ExecutionStatus.ORCHESTRATED;
+import static org.apache.gobblin.service.ExecutionStatus.valueOf;
+
+
+/**
+ * Holds a stream of {@link DagTask} that {@link DagManager} would pull from
to process, as it is ready for more work.
+ * It provides an implementation for {@link DagManagement} defines the rules
for a flow and job.
+ * Implements {@link Iterator} to provide the next {@link DagTask} if
available to {@link DagManager}
+ */
+
+@Alpha
+@Slf4j
+@AllArgsConstructor
+public class DagTaskStream implements Iterator<Optional<DagTask>>,
DagManagement {
+
+ @Getter
+ private final BlockingDeque<DagActionStore.DagAction> dagActionQueue = new
LinkedBlockingDeque<>();
+ private FlowTriggerHandler flowTriggerHandler;
+ private DagManagementStateStore dagManagementStateStore;
+ private DagManagerMetrics dagManagerMetrics;
+
+
+ @Override
+ public boolean hasNext() {
+ return true;
+ }
+
+
+ @Override
+ public Optional<DagTask> next() {
+
+ DagActionStore.DagAction dagAction = dagActionQueue.peek();
+ try {
+ Preconditions.checkArgument(dagAction != null, "No Dag Action found in
the queue");
+ Properties jobProps = getJobProperties(dagAction);
+ Optional<MultiActiveLeaseArbiter.LeaseObtainedStatus>
leaseObtainedStatus =
+ flowTriggerHandler.getLeaseOnDagAction(jobProps, dagAction,
System.currentTimeMillis()).toJavaUtil();
+ if(leaseObtainedStatus.isPresent()) {
+ DagTask dagTask = createDagTask(dagAction, leaseObtainedStatus.get());
+ return Optional.of(dagTask);
+ }
+ } catch (Exception ex) {
+ //TODO: need to handle exceptions gracefully
+ throw new RuntimeException(ex);
+ }
+ return Optional.empty();
+ }
+
+ public boolean add(DagActionStore.DagAction dagAction) throws IOException {
+ return this.dagActionQueue.offer(dagAction);
+ }
+
+ public DagActionStore.DagAction take() {
+ return this.dagActionQueue.poll();
+ }
+
+ public DagTask createDagTask(DagActionStore.DagAction dagAction,
MultiActiveLeaseArbiter.LeaseObtainedStatus leaseObtainedStatus) {
+ DagActionStore.FlowActionType flowActionType =
dagAction.getFlowActionType();
+ switch (flowActionType) {
+ case KILL:
+ return new KillDagTask(dagAction, leaseObtainedStatus);
+ case RESUME:
+ case LAUNCH:
+ case ADVANCE:
+ default:
+ log.warn("It should not reach here. Yet to provide implementation.");
+ return null;
+ }
+ }
+
+ @Override
+ public void launchFlow(String flowGroup, String flowName, long
triggerTimeStamp) {
+ //TODO: provide implementation after finalizing code flow
+ throw new UnsupportedOperationException("Currently launch flow is not
supported.");
+ }
+
+ @Override
+ public void resumeFlow(String flowGroup, String flowName, String
flowExecutionId, long triggerTimeStamp) throws IOException,
InterruptedException {
+ //TODO: provide implementation after finalizing code flow
+ throw new UnsupportedOperationException("Currently launch flow is not
supported.");
+ }
+
+ @Override
+ public void killFlow(String flowGroup, String flowName, String
flowExecutionId, long produceTimestamp) throws IOException {
+ DagManager.DagId dagId = DagManagerUtils.generateDagId(flowGroup,
flowName, flowExecutionId);
+
if(!this.dagManagementStateStore.getDagIdToDags().containsKey(dagId.toString()))
{
+ log.info("Invalid dag since not present in map. Hence cannot cancel it");
+ return;
+ }
+ DagActionStore.DagAction killAction = new
DagActionStore.DagAction(flowGroup, flowName, flowExecutionId,
DagActionStore.FlowActionType.KILL);
+ if(!add(killAction)) {
+ throw new IOException("Could not add kill dag action: " + killAction +
" to the queue.");
+ }
+ }
+ /**
+ * Check if the SLA is configured for the flow this job belongs to.
+ * If it is, this method will try to cancel the job when SLA is reached.
+ *
+ * @param node dag node of the job
+ * @return true if the job is killed because it reached sla
+ * @throws ExecutionException exception
+ * @throws InterruptedException exception
+ */
+
+ @Override
+ public boolean enforceFlowCompletionDeadline(Dag.DagNode<JobExecutionPlan>
node) throws ExecutionException, InterruptedException {
+ //TODO: need to distribute the responsibility outside of this class
+ long flowStartTime = DagManagerUtils.getFlowStartTime(node);
+ long currentTime = System.currentTimeMillis();
+ String dagId = DagManagerUtils.generateDagId(node).toString();
+
+ long flowSla;
+ if (this.dagManagementStateStore.getDagToSLA().containsKey(dagId)) {
+ flowSla = this.dagManagementStateStore.getDagToSLA().get(dagId);
+ } else {
+ try {
+ flowSla = DagManagerUtils.getFlowSLA(node);
+ } catch (ConfigException e) {
+ log.warn("Flow SLA for flowGroup: {}, flowName: {} is given in invalid
format, using default SLA of {}",
+
node.getValue().getJobSpec().getConfig().getString(ConfigurationKeys.FLOW_GROUP_KEY),
+
node.getValue().getJobSpec().getConfig().getString(ConfigurationKeys.FLOW_NAME_KEY),
+ DagManagerUtils.DEFAULT_FLOW_SLA_MILLIS);
+ flowSla = DagManagerUtils.DEFAULT_FLOW_SLA_MILLIS;
+ }
+ this.dagManagementStateStore.getDagToSLA().put(dagId, flowSla);
+ }
+
+ if (currentTime > flowStartTime + flowSla) {
+ log.info("Flow {} exceeded the SLA of {} ms. Killing the job {} now...",
+
node.getValue().getJobSpec().getConfig().getString(ConfigurationKeys.FLOW_NAME_KEY),
flowSla,
+
node.getValue().getJobSpec().getConfig().getString(ConfigurationKeys.JOB_NAME_KEY));
+ dagManagerMetrics.incrementExecutorSlaExceeded(node);
Review Comment:
we should emit a flowSLA kill event here or at least put a TODO for it. It
looks like the one below is on an executor basis but not flow level.
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/InMemoryDagManagementStateStore.java:
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import com.google.common.collect.Lists;
+
+import lombok.Getter;
+import lombok.Synchronized;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+
+/**
+ * An implementation of {@link DagManagementStateStore} to provide information
about dags, dag nodes and their job states.
+ * Currently, this store maintains and utilizes in-memory references about
dags and their job states and is used
+ * to determine what the current status of the {@link Dag} and/or {@link
Dag.DagNode} is and what actions needs to be
+ * taken next likewise mark it as: complete, failed, sla breached or simply
clean up after completion.
+ * Going forward, each of these in-memory references will be read/write from
MySQL store.
Review Comment:
will we change this class or create a new one to implement
`DagManagementStateStore`? also missing space before the open paren of class
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/exception/MaybeRetryableException.java:
##########
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.exception;
+
+import org.apache.gobblin.service.modules.orchestration.processor.DagProc;
+
+
+/**
+ * Exception defined for handling the retries while processing {@link DagProc}
+ */
+
+public abstract class MaybeRetryableException extends Throwable {
Review Comment:
PotentialRetryableException?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/processor/KillDagProc.java:
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.processor;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import com.google.api.client.util.Lists;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
+import org.apache.gobblin.service.modules.orchestration.TimingEventUtils;
+import
org.apache.gobblin.service.modules.orchestration.exception.MaybeRetryableException;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.service.modules.orchestration.task.KillDagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+import static org.apache.gobblin.service.ExecutionStatus.CANCELLED;
+
+
+/**
+ * An implementation of {@link DagProc} for killing {@link DagTask}.
+ */
+@Slf4j
+@Alpha
+public final class KillDagProc extends DagProc {
+
+ private KillDagTask killDagTask;
+ private DagManagementStateStore dagManagementStateStore;
+ private MetricContext metricContext;
+ private Optional<EventSubmitter> eventSubmitter;
+
+
+ public KillDagProc(KillDagTask killDagTask, DagManagementStateStore
dagManagementStateStore, MetricContext metricContext, Optional<EventSubmitter>
eventSubmitter) throws IOException {
+ this.killDagTask = killDagTask;
+ this.dagManagementStateStore = dagManagementStateStore;
+ this.metricContext = metricContext;
+ this.eventSubmitter = eventSubmitter;
+ }
+
+ public KillDagProc(KillDagTask killDagTask) {
+ this.killDagTask = killDagTask;
+ }
+
+ @Override
+ protected List<Dag.DagNode<JobExecutionPlan>>
initialize(DagManagementStateStore dagManagementStateStore) {
+ Map<String, LinkedList<Dag.DagNode<JobExecutionPlan>>> dagToJobs =
dagManagementStateStore.getDagToJobs();
+ if(dagToJobs.containsKey(this.killDagTask.getKillDagId().toString())) {
+ return dagToJobs.get(this.killDagTask.getKillDagId().toString());
+ }
+ return Lists.newArrayList();
+ }
+
+ /**
+ * Post initialization of the Dag with the current state, it will identify
the {@link Dag.DagNode}s to be killed
+ * and cancel the job on the executor. The return type is kept as {@link
Object} since we might want to refactor
+ * or add more responsibility as part of the actions taken. Hence, after
completing all possible scenarios,
+ * it will make sense to update the method signature with its appropriate
type.
+ * @param state
+ * @return
+ * @throws InterruptedException
+ * @throws ExecutionException
+ * @throws IOException
+ */
+ @Override
+ protected Object act(Object state) throws Exception {
Review Comment:
why do we have a generic input and return type of Object? should the state
not just be the `List<Dag.DagNode<JobExecutionPlan>>` cast you do below?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagement.java:
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutionException;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.JobStatus;
+
+
+/**
+ * Responsible for defining the behavior of {@link DagTask} handling scenarios
for launch, resume, kill, job start
+ * and flow completion deadlines
+ *
+ */
+@Alpha
+public interface DagManagement {
+
+ /**
+ * Currently, it is handling just the launch of a {@link Dag} request via
REST client for adhoc flows
+ * @param flowGroup
+ * @param flowName
+ * @param triggerTimeStamp
+ */
+ void launchFlow(String flowGroup, String flowName, long triggerTimeStamp);
+
+ /**
+ * Currently, it is handling just the resume of a {@link Dag} request via
REST client for adhoc flows
+ * @param flowGroup
+ * @param flowName
+ * @param flowExecutionId
+ * @param triggerTimeStamp
+ * @throws IOException
+ */
+ void resumeFlow(String flowGroup, String flowName, String flowExecutionId,
long triggerTimeStamp)
+ throws IOException, InterruptedException;
+
+ /**
+ * Currently, it is handling just the kill/cancel of a {@link Dag} request
via REST client for adhoc flows
+ * @param flowGroup
+ * @param flowName
+ * @param flowExecutionId
+ * @param triggerTimeStamp
+ */
+ void killFlow(String flowGroup, String flowName, String flowExecutionId,
long triggerTimeStamp)
Review Comment:
you could argue that any action can have a `trigger/EventTimestamp`
associated with it. We may want to reserve `trigger` for scheduled flow event
triggers to be unambiguous.
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java:
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.util.Optional;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+import com.typesafe.config.Config;
+
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.service.modules.orchestration.processor.DagProc;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+@Alpha
+@Slf4j
+public class DagProcessingEngine {
+
+ public static final String DAG_PROCESSING_ENGINE_PREFIX =
"gobblin.service.dagProcessingEngine.";
+ public static final String NUM_THREADS_KEY = DAG_PROCESSING_ENGINE_PREFIX +
"numThreads";
+ public static final String JOB_STATUS_POLLING_INTERVAL_KEY =
DAG_PROCESSING_ENGINE_PREFIX + "pollingInterval";
+
+ private static final Integer DEFAULT_NUM_THREADS = 3;
+ private static final Integer DEFAULT_JOB_STATUS_POLLING_INTERVAL = 10;
+
+ private DagTaskStream dagTaskStream;
+ private DagProcFactory dagProcFactory;
+ private DagManagementStateStore dagManagementStateStore;
+ private ScheduledExecutorService scheduledExecutorPool;
+ private Config config;
+ private Integer numThreads;
+ private Integer pollingInterval;
+ private DagProcessingEngine.Thread [] threads;
+ private MultiActiveLeaseArbiter multiActiveLeaseArbiter;
+
+
+ public DagProcessingEngine(Config config, DagTaskStream dagTaskStream,
DagProcFactory dagProcFactory, DagManagementStateStore dagManagementStateStore,
MultiActiveLeaseArbiter multiActiveLeaseArbiter) {
+ this.config = config;
+ this.dagTaskStream = dagTaskStream;
+ this.dagProcFactory = dagProcFactory;
+ this.dagManagementStateStore = dagManagementStateStore;
+ this.multiActiveLeaseArbiter = multiActiveLeaseArbiter;
+ this.numThreads = ConfigUtils.getInt(config, NUM_THREADS_KEY,
DEFAULT_NUM_THREADS);
+ this.scheduledExecutorPool = Executors.newScheduledThreadPool(numThreads);
+ this.pollingInterval = ConfigUtils.getInt(config,
JOB_STATUS_POLLING_INTERVAL_KEY, DEFAULT_JOB_STATUS_POLLING_INTERVAL);
+ this.threads = new DagProcessingEngine.Thread[this.numThreads];
+ for(int i=0; i < this.numThreads; i++) {
+ Thread thread = new Thread(dagTaskStream, dagProcFactory,
dagManagementStateStore, multiActiveLeaseArbiter);
+ this.threads[i] = thread;
+ }
+ }
+
+
+ @AllArgsConstructor
+ private static class Thread implements Runnable {
+ private DagTaskStream dagTaskStream;
+ private DagProcFactory dagProcFactory;
+ private DagManagementStateStore dagManagementStateStore;
+ private MultiActiveLeaseArbiter multiActiveLeaseArbiter;
+
+ @Override
+ public void run() {
+ try {
+ while (dagTaskStream.hasNext()) {
+ Optional<DagTask> dagTask = getNextTask();
+ if (dagTask.isPresent()) {
+ DagProc dagProc = (DagProc) dagTask.get().host(dagProcFactory);
Review Comment:
when is `dagTask` not present? is that if we don't get lease or unrelated?
What does "getting the dag Proc mean"?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java:
##########
@@ -0,0 +1,33 @@
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.Map;
+
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+
+/**
+ * An interface to provide abstractions for managing {@link Dag} and {@link
org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} states
+ */
+public interface DagManagementStateStore {
+
+ public void addJobState(String dagId, Dag.DagNode<JobExecutionPlan> dagNode);
+
+ public void deleteJobState(String dagId, Dag.DagNode<JobExecutionPlan>
dagNode);
Review Comment:
when are these actions called? do you ever want to `updateJobState`?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcFactory.java:
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import com.google.common.base.Optional;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
+import org.apache.gobblin.service.modules.flow.SpecCompiler;
+import org.apache.gobblin.service.modules.orchestration.processor.DagProc;
+import org.apache.gobblin.service.modules.orchestration.processor.KillDagProc;
+import org.apache.gobblin.service.modules.orchestration.task.AdvanceDagTask;
+import org.apache.gobblin.service.modules.orchestration.task.CleanUpDagTask;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.service.modules.orchestration.task.KillDagTask;
+import org.apache.gobblin.service.modules.orchestration.task.LaunchDagTask;
+import org.apache.gobblin.service.modules.orchestration.task.ResumeDagTask;
+import
org.apache.gobblin.service.modules.utils.FlowCompilationValidationHelper;
+import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
+import org.apache.gobblin.service.monitoring.JobStatusRetriever;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * Factory for creating {@link DagProc} based on the visitor type for a given
{@link DagTask}.
+ */
+
+@Alpha
+@Slf4j
+public class DagProcFactory implements DagTaskVisitor<DagProc> {
+
+ private DagManagementStateStore dagManagementStateStore;
+ private JobStatusRetriever jobStatusRetriever;
+ private FlowStatusGenerator flowStatusGenerator;
+ private UserQuotaManager quotaManager;
+ private SpecCompiler specCompiler;
+ private FlowCatalog flowCatalog;
+ private FlowCompilationValidationHelper flowCompilationValidationHelper;
+ private Config config;
+ private Optional<EventSubmitter> eventSubmitter;
+ private boolean instrumentationEnabled;
+
Review Comment:
Let's use Guice to inject these classes when we're actually instantiating it
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/InMemoryDagManagementStateStore.java:
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import com.google.common.collect.Lists;
+
+import lombok.Getter;
+import lombok.Synchronized;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+
+/**
+ * An implementation of {@link DagManagementStateStore} to provide information
about dags, dag nodes and their job states.
+ * Currently, this store maintains and utilizes in-memory references about
dags and their job states and is used
Review Comment:
"This store represents the current state of the which uses in-memory
references about..." not sure if currently makes sense bc
`InMemoryDagManagementStateStore` will always use in-memory references
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagTaskStream.java:
##########
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.LinkedBlockingDeque;
+
+import com.codahale.metrics.Timer;
+import com.google.common.base.Preconditions;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigException;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.config.ConfigBuilder;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
+import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.processor.KillDagProc;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.service.modules.orchestration.task.KillDagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import
org.apache.gobblin.service.modules.utils.FlowCompilationValidationHelper;
+import org.apache.gobblin.service.monitoring.JobStatus;
+import org.apache.gobblin.service.monitoring.JobStatusRetriever;
+
+import static org.apache.gobblin.service.ExecutionStatus.ORCHESTRATED;
+import static org.apache.gobblin.service.ExecutionStatus.valueOf;
+
+
+/**
+ * Holds a stream of {@link DagTask} that {@link DagManager} would pull from
to process, as it is ready for more work.
+ * It provides an implementation for {@link DagManagement} defines the rules
for a flow and job.
+ * Implements {@link Iterator} to provide the next {@link DagTask} if
available to {@link DagManager}
+ */
+
+@Alpha
+@Slf4j
+@AllArgsConstructor
+public class DagTaskStream implements Iterator<Optional<DagTask>>,
DagManagement {
+
+ @Getter
+ private final BlockingDeque<DagActionStore.DagAction> dagActionQueue = new
LinkedBlockingDeque<>();
+ private FlowTriggerHandler flowTriggerHandler;
+ private DagManagementStateStore dagManagementStateStore;
+ private DagManagerMetrics dagManagerMetrics;
+
+
+ @Override
+ public boolean hasNext() {
+ return true;
+ }
+
+
+ @Override
+ public Optional<DagTask> next() {
+
+ DagActionStore.DagAction dagAction = dagActionQueue.peek();
+ try {
+ Preconditions.checkArgument(dagAction != null, "No Dag Action found in
the queue");
+ Properties jobProps = getJobProperties(dagAction);
+ Optional<MultiActiveLeaseArbiter.LeaseObtainedStatus>
leaseObtainedStatus =
+ flowTriggerHandler.getLeaseOnDagAction(jobProps, dagAction,
System.currentTimeMillis()).toJavaUtil();
+ if(leaseObtainedStatus.isPresent()) {
Review Comment:
I see this is where we filter out the task if lease is not obtained. It
would be good to explain in javadoc when we return an empty optional versus one
with task
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/processor/CleanUpDagProc.java:
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.processor;
+
+import java.io.IOException;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import
org.apache.gobblin.service.modules.orchestration.exception.MaybeRetryableException;
+
+
+/**
+ * An implementation of {@link DagProc} that is responsible for clean up
{@link Dag} that has been completed
Review Comment:
let's just say for a dag that has reached end state and then state the 3
possible end states
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java:
##########
@@ -313,4 +317,42 @@ protected static long getUTCTimeFromDelayPeriod(long
delayPeriodMillis) {
Date date = Date.from(localDateTime.atZone(ZoneId.of("UTC")).toInstant());
return GobblinServiceJobScheduler.utcDateAsUTCEpochMillis(date);
}
+
+ /**
+ * Attempts to acquire lease for a given {@link
org.apache.gobblin.runtime.api.DagActionStore.DagAction}
+ * through lease arbitration and if it fails, it will create and schedule a
reminder trigger to check back again.
Review Comment:
isn't this what `handleTriggerEvent` does already? It tries to acquire lease
and schedule a reminder and returns the status. If you want to specifically
return `Optional<MultiActiveLeaseArbiter.LeaseObtainedStatus>` you can put a
wrapper around it instead of re-writing this function
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagTaskStream.java:
##########
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.LinkedBlockingDeque;
+
+import com.codahale.metrics.Timer;
+import com.google.common.base.Preconditions;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigException;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.config.ConfigBuilder;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
+import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.processor.KillDagProc;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.service.modules.orchestration.task.KillDagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import
org.apache.gobblin.service.modules.utils.FlowCompilationValidationHelper;
+import org.apache.gobblin.service.monitoring.JobStatus;
+import org.apache.gobblin.service.monitoring.JobStatusRetriever;
+
+import static org.apache.gobblin.service.ExecutionStatus.ORCHESTRATED;
+import static org.apache.gobblin.service.ExecutionStatus.valueOf;
+
+
+/**
+ * Holds a stream of {@link DagTask} that {@link DagManager} would pull from
to process, as it is ready for more work.
+ * It provides an implementation for {@link DagManagement} defines the rules
for a flow and job.
+ * Implements {@link Iterator} to provide the next {@link DagTask} if
available to {@link DagManager}
+ */
+
+@Alpha
+@Slf4j
+@AllArgsConstructor
+public class DagTaskStream implements Iterator<Optional<DagTask>>,
DagManagement {
+
+ @Getter
+ private final BlockingDeque<DagActionStore.DagAction> dagActionQueue = new
LinkedBlockingDeque<>();
+ private FlowTriggerHandler flowTriggerHandler;
+ private DagManagementStateStore dagManagementStateStore;
+ private DagManagerMetrics dagManagerMetrics;
+
+
+ @Override
+ public boolean hasNext() {
+ return true;
+ }
+
+
+ @Override
+ public Optional<DagTask> next() {
+
+ DagActionStore.DagAction dagAction = dagActionQueue.peek();
+ try {
+ Preconditions.checkArgument(dagAction != null, "No Dag Action found in
the queue");
+ Properties jobProps = getJobProperties(dagAction);
+ Optional<MultiActiveLeaseArbiter.LeaseObtainedStatus>
leaseObtainedStatus =
+ flowTriggerHandler.getLeaseOnDagAction(jobProps, dagAction,
System.currentTimeMillis()).toJavaUtil();
+ if(leaseObtainedStatus.isPresent()) {
+ DagTask dagTask = createDagTask(dagAction, leaseObtainedStatus.get());
+ return Optional.of(dagTask);
+ }
+ } catch (Exception ex) {
+ //TODO: need to handle exceptions gracefully
+ throw new RuntimeException(ex);
+ }
+ return Optional.empty();
+ }
+
+ public boolean add(DagActionStore.DagAction dagAction) throws IOException {
+ return this.dagActionQueue.offer(dagAction);
+ }
+
+ public DagActionStore.DagAction take() {
+ return this.dagActionQueue.poll();
+ }
+
+ public DagTask createDagTask(DagActionStore.DagAction dagAction,
MultiActiveLeaseArbiter.LeaseObtainedStatus leaseObtainedStatus) {
+ DagActionStore.FlowActionType flowActionType =
dagAction.getFlowActionType();
+ switch (flowActionType) {
+ case KILL:
+ return new KillDagTask(dagAction, leaseObtainedStatus);
+ case RESUME:
+ case LAUNCH:
+ case ADVANCE:
+ default:
+ log.warn("It should not reach here. Yet to provide implementation.");
+ return null;
+ }
+ }
+
+ @Override
+ public void launchFlow(String flowGroup, String flowName, long
triggerTimeStamp) {
+ //TODO: provide implementation after finalizing code flow
+ throw new UnsupportedOperationException("Currently launch flow is not
supported.");
+ }
+
+ @Override
+ public void resumeFlow(String flowGroup, String flowName, String
flowExecutionId, long triggerTimeStamp) throws IOException,
InterruptedException {
+ //TODO: provide implementation after finalizing code flow
+ throw new UnsupportedOperationException("Currently launch flow is not
supported.");
+ }
+
+ @Override
+ public void killFlow(String flowGroup, String flowName, String
flowExecutionId, long produceTimestamp) throws IOException {
+ DagManager.DagId dagId = DagManagerUtils.generateDagId(flowGroup,
flowName, flowExecutionId);
+
if(!this.dagManagementStateStore.getDagIdToDags().containsKey(dagId.toString()))
{
+ log.info("Invalid dag since not present in map. Hence cannot cancel it");
+ return;
+ }
+ DagActionStore.DagAction killAction = new
DagActionStore.DagAction(flowGroup, flowName, flowExecutionId,
DagActionStore.FlowActionType.KILL);
+ if(!add(killAction)) {
+ throw new IOException("Could not add kill dag action: " + killAction +
" to the queue.");
+ }
+ }
+ /**
+ * Check if the SLA is configured for the flow this job belongs to.
+ * If it is, this method will try to cancel the job when SLA is reached.
+ *
+ * @param node dag node of the job
+ * @return true if the job is killed because it reached sla
+ * @throws ExecutionException exception
+ * @throws InterruptedException exception
+ */
+
+ @Override
+ public boolean enforceFlowCompletionDeadline(Dag.DagNode<JobExecutionPlan>
node) throws ExecutionException, InterruptedException {
+ //TODO: need to distribute the responsibility outside of this class
+ long flowStartTime = DagManagerUtils.getFlowStartTime(node);
+ long currentTime = System.currentTimeMillis();
+ String dagId = DagManagerUtils.generateDagId(node).toString();
+
+ long flowSla;
+ if (this.dagManagementStateStore.getDagToSLA().containsKey(dagId)) {
+ flowSla = this.dagManagementStateStore.getDagToSLA().get(dagId);
+ } else {
+ try {
+ flowSla = DagManagerUtils.getFlowSLA(node);
+ } catch (ConfigException e) {
+ log.warn("Flow SLA for flowGroup: {}, flowName: {} is given in invalid
format, using default SLA of {}",
+
node.getValue().getJobSpec().getConfig().getString(ConfigurationKeys.FLOW_GROUP_KEY),
+
node.getValue().getJobSpec().getConfig().getString(ConfigurationKeys.FLOW_NAME_KEY),
+ DagManagerUtils.DEFAULT_FLOW_SLA_MILLIS);
+ flowSla = DagManagerUtils.DEFAULT_FLOW_SLA_MILLIS;
+ }
+ this.dagManagementStateStore.getDagToSLA().put(dagId, flowSla);
+ }
+
+ if (currentTime > flowStartTime + flowSla) {
+ log.info("Flow {} exceeded the SLA of {} ms. Killing the job {} now...",
+
node.getValue().getJobSpec().getConfig().getString(ConfigurationKeys.FLOW_NAME_KEY),
flowSla,
+
node.getValue().getJobSpec().getConfig().getString(ConfigurationKeys.JOB_NAME_KEY));
+ dagManagerMetrics.incrementExecutorSlaExceeded(node);
+ KillDagProc.killDagNode(node);
+
+
this.dagManagementStateStore.getDagIdToDags().get(dagId).setFlowEvent(TimingEvent.FlowTimings.FLOW_RUN_DEADLINE_EXCEEDED);
+
this.dagManagementStateStore.getDagIdToDags().get(dagId).setMessage("Flow
killed due to exceeding SLA of " + flowSla + " ms");
+
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * Cancel the job if the job has been "orphaned". A job is orphaned if has
been in ORCHESTRATED
+ * {@link ExecutionStatus} for some specific amount of time.
+ * @param node {@link Dag.DagNode} representing the job
+ * @param jobStatus current {@link JobStatus} of the job
+ * @return true if the total time that the job remains in the ORCHESTRATED
state exceeds
+ * {@value ConfigurationKeys#GOBBLIN_JOB_START_SLA_TIME}.
+ */
+
+ @Override
+ public boolean enforceJobStartDeadline(Dag.DagNode<JobExecutionPlan> node,
JobStatus jobStatus) throws ExecutionException, InterruptedException {
+ //TODO: need to distribute the responsibility outside of this class
+ if (jobStatus == null) {
+ return false;
+ }
+ ExecutionStatus executionStatus = valueOf(jobStatus.getEventName());
+ //TODO: initialize default job sla in millis via configs
+ long timeOutForJobStart = DagManagerUtils.getJobStartSla(node,
System.currentTimeMillis());
+ long jobOrchestratedTime = jobStatus.getOrchestratedTime();
+ if (executionStatus == ORCHESTRATED && System.currentTimeMillis() -
jobOrchestratedTime > timeOutForJobStart) {
+ log.info("Job {} of flow {} exceeded the job start SLA of {} ms. Killing
the job now...",
+ DagManagerUtils.getJobName(node),
+ DagManagerUtils.getFullyQualifiedDagName(node),
+ timeOutForJobStart);
Review Comment:
same as above should emit a flowStartSLAKilled event
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/processor/KillDagProc.java:
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.processor;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import com.google.api.client.util.Lists;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
+import org.apache.gobblin.service.modules.orchestration.TimingEventUtils;
+import
org.apache.gobblin.service.modules.orchestration.exception.MaybeRetryableException;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.service.modules.orchestration.task.KillDagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+import static org.apache.gobblin.service.ExecutionStatus.CANCELLED;
+
+
+/**
+ * An implementation of {@link DagProc} for killing {@link DagTask}.
+ */
+@Slf4j
+@Alpha
+public final class KillDagProc extends DagProc {
+
+ private KillDagTask killDagTask;
+ private DagManagementStateStore dagManagementStateStore;
+ private MetricContext metricContext;
+ private Optional<EventSubmitter> eventSubmitter;
+
+
+ public KillDagProc(KillDagTask killDagTask, DagManagementStateStore
dagManagementStateStore, MetricContext metricContext, Optional<EventSubmitter>
eventSubmitter) throws IOException {
+ this.killDagTask = killDagTask;
+ this.dagManagementStateStore = dagManagementStateStore;
+ this.metricContext = metricContext;
+ this.eventSubmitter = eventSubmitter;
+ }
+
+ public KillDagProc(KillDagTask killDagTask) {
+ this.killDagTask = killDagTask;
+ }
+
+ @Override
+ protected List<Dag.DagNode<JobExecutionPlan>>
initialize(DagManagementStateStore dagManagementStateStore) {
+ Map<String, LinkedList<Dag.DagNode<JobExecutionPlan>>> dagToJobs =
dagManagementStateStore.getDagToJobs();
+ if(dagToJobs.containsKey(this.killDagTask.getKillDagId().toString())) {
+ return dagToJobs.get(this.killDagTask.getKillDagId().toString());
+ }
+ return Lists.newArrayList();
+ }
+
+ /**
+ * Post initialization of the Dag with the current state, it will identify
the {@link Dag.DagNode}s to be killed
+ * and cancel the job on the executor. The return type is kept as {@link
Object} since we might want to refactor
+ * or add more responsibility as part of the actions taken. Hence, after
completing all possible scenarios,
+ * it will make sense to update the method signature with its appropriate
type.
+ * @param state
+ * @return
+ * @throws InterruptedException
+ * @throws ExecutionException
+ * @throws IOException
+ */
+ @Override
+ protected Object act(Object state) throws Exception {
+ List<Dag.DagNode<JobExecutionPlan>> dagNodesToCancel =
(List<Dag.DagNode<JobExecutionPlan>>)state;
+ Preconditions.checkArgument(!dagNodesToCancel.isEmpty(), "Dag doesn't
contain any DagNodes to be cancelled");
+ String dagToCancel = this.killDagTask.getKillDagId().toString();
+
+ log.info("Found {} DagNodes to cancel.", dagNodesToCancel.size());
+ for (Dag.DagNode<JobExecutionPlan> dagNodeToCancel : dagNodesToCancel) {
+ killDagNode(dagNodeToCancel);
+ }
+
this.dagManagementStateStore.getDagIdToDags().get(dagToCancel).setFlowEvent(TimingEvent.FlowTimings.FLOW_CANCELLED);
+
this.dagManagementStateStore.getDagIdToDags().get(dagToCancel).setMessage("Flow
killed by request");
+
this.dagManagementStateStore.removeDagActionFromStore(this.killDagTask.getKillDagId(),
DagActionStore.FlowActionType.KILL);
+ return this.dagManagementStateStore.getDagIdToDags().get(dagToCancel);
+
+ }
+
+ @Override
+ protected void sendNotification(Object result) throws
MaybeRetryableException {
+ Dag<JobExecutionPlan> dag = (Dag<JobExecutionPlan>) result;
+ for(Dag.DagNode<JobExecutionPlan> dagNodeToCancel : dag.getNodes()) {
+ sendCancellationEvent(dagNodeToCancel.getValue());
+ }
+ }
+
+ public static void killDagNode(Dag.DagNode<JobExecutionPlan>
dagNodeToCancel) throws ExecutionException, InterruptedException {
Review Comment:
java doc here please
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/processor/DagProc.java:
##########
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.processor;
+
+import java.io.IOException;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import
org.apache.gobblin.service.modules.orchestration.exception.MaybeRetryableException;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+
+
+/**
+ * Responsible to performing the actual work for a given {@link DagTask}.
+ * It processes the {@link DagTask} by first initializing its state,
performing actions
+ * based on the type of {@link DagTask} and finally submitting an event to the
executor.
+ * @param <S> current state of the dag node
+ * @param <R> result after processing the dag node
+ */
Review Comment:
who is the caller of this class?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagTask.java:
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.task;
+
+import java.io.IOException;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import org.apache.gobblin.service.modules.orchestration.DagTaskVisitor;
+import org.apache.gobblin.service.modules.orchestration.processor.DagProc;
+
+
+/**
+ * Defines an individual task or job in a Dag.
+ * Upon completion of the {@link DagProc#process(DagManagementStateStore)} it
will mark the lease
+ * acquired by {@link org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter}
as complete
+ * @param <T>
+ */
+
+@Alpha
+public abstract class DagTask<T> {
+
+ protected MultiActiveLeaseArbiter.LeaseObtainedStatus
leaseObtainedStatusStatus;
+
+ /**
+ * Currently, conclusion of {@link DagTask} marks and records a successful
release of lease.
+ * It is invoked after {@link DagProc#process(DagManagementStateStore)} is
completed successfully.
+ * @param multiActiveLeaseArbiter
+ * @throws IOException
+ */
Review Comment:
This is different from our approach to launch in that we finish the action
before completing lease. Can you document the pros and cons of this method? For
ex pro we don't have to do lease arbitration twice for attempting the action
and doing it. However now the lease validity time needs to be much larger and
include the time to contact executor and carry out the action. Let's note these
details here or somewhere else appropriate.
Issue Time Tracking
-------------------
Worklog Id: (was: 881042)
Time Spent: 4h 40m (was: 4.5h)
> Refactor code to move current in-memory references to new design for REST
> calls: Launch, Resume and Kill
> --------------------------------------------------------------------------------------------------------
>
> Key: GOBBLIN-1910
> URL: https://issues.apache.org/jira/browse/GOBBLIN-1910
> Project: Apache Gobblin
> Issue Type: New Feature
> Reporter: Meeth Gala
> Priority: Major
> Time Spent: 4h 40m
> Remaining Estimate: 0h
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)