umustafi commented on code in PR #3950:
URL: https://github.com/apache/gobblin/pull/3950#discussion_r1605444064
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionStore.java:
##########
@@ -30,12 +30,14 @@
public interface DagActionStore {
public static final String NO_JOB_NAME_DEFAULT = "";
enum DagActionType {
+ CANCEL, // Invoked through DagManager if flow has been stuck in
Orchestrated state for a while
KILL, // Kill invoked through API call
- RESUME, // Resume flow invoked through API call
LAUNCH, // Launch new flow execution invoked adhoc or through scheduled
trigger
+ REEVALUATE, // Re-evaluate what needs to be done upon receipt of a final
job status
+ RESUME, // Resume flow invoked through API call
RETRY, // Invoked through DagManager for flows configured to allow retries
- CANCEL, // Invoked through DagManager if flow has been stuck in
Orchestrated state for a while
- REEVALUATE // Re-evaluate what needs to be done upon receipt of a final
job status
+ ENFORCE_START_DEADLINE, // Enforce job start deadline
+ ENFORCE_FINISH_DEADLINE, // Enforce job finish deadline
Review Comment:
are both job level or the finish is flow level?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionStore.java:
##########
@@ -69,6 +72,10 @@ public DagNodeId getDagNodeId() {
return new DagNodeId(this.flowGroup, this.flowName,
Long.parseLong(this.flowExecutionId), this.flowGroup, this.jobName);
}
+
+ public void setReminder(boolean isReminder) {
Review Comment:
replace with @setter Lombok annotation
##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java:
##########
@@ -246,6 +256,19 @@ protected void
processMessage(DecodeableKafkaRecord<byte[],byte[]> message) {
}
}
+ private void clearStartDeadlineTriggerAndDagAction(String flowGroup, String
flowName, String flowExecutionId, String jobName) {
+
+ DagActionStore.DagAction enforceStartDeadlineDagAction = new
DagActionStore.DagAction(flowGroup, flowName,
+ String.valueOf(flowExecutionId), jobName,
DagActionStore.DagActionType.ENFORCE_START_DEADLINE);
+ try {
+
GobblinServiceManager.getClass(DagActionReminderScheduler.class).unscheduleReminderJob(enforceStartDeadlineDagAction);
+
GobblinServiceManager.getClass(DagActionStore.class).deleteDagAction(enforceStartDeadlineDagAction);
+ } catch (SchedulerException | IOException e) {
+ log.warn("Failed to unschedule the reminder for {}",
enforceStartDeadlineDagAction);
Review Comment:
error level logging maybe. although its possible the scheduler reminder
fired and was deleted so that can be warn while the other is error.
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java:
##########
@@ -191,4 +196,16 @@ private void submitNextNodes(DagManagementStateStore
dagManagementStateStore, Da
private void handleMultipleJobs(Set<Dag.DagNode<JobExecutionPlan>>
nextNodes) {
throw new UnsupportedOperationException("More than one start job is not
allowed");
}
+
+ private void clearFinishDeadlineTriggerAndDagAction() {
Review Comment:
nit: rename to "removeFinish..."
##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceDeadlineDagProcsTest.java:
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.proc;
+
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
+import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
+import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.modules.core.GobblinServiceManager;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.DagActionStore;
+import org.apache.gobblin.service.modules.orchestration.DagManager;
+import org.apache.gobblin.service.modules.orchestration.DagManagerTest;
+import
org.apache.gobblin.service.modules.orchestration.MostlyMySqlDagManagementStateStore;
+import
org.apache.gobblin.service.modules.orchestration.MostlyMySqlDagManagementStateStoreTest;
+import
org.apache.gobblin.service.modules.orchestration.task.EnforceFinishDeadlineDagTask;
+import
org.apache.gobblin.service.modules.orchestration.task.EnforceStartDeadlineDagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.JobStatus;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+
+
+public class EnforceDeadlineDagProcsTest {
+ private ITestMetastoreDatabase testMetastoreDatabase;
+ private final MockedStatic<GobblinServiceManager>
mockedGobblinServiceManager = Mockito.mockStatic(GobblinServiceManager.class);
+
+ @BeforeClass
+ public void setUp() throws Exception {
+ this.testMetastoreDatabase = TestMetastoreDatabaseFactory.get();
+ }
+
+ @AfterClass(alwaysRun = true)
+ public void tearDown() throws Exception {
+ // `.close()` to avoid (in the aggregate, across multiple suites) -
java.sql.SQLNonTransientConnectionException: Too many connections
+ this.testMetastoreDatabase.close();
+ this.mockedGobblinServiceManager.close();
+ }
+
+ @Test
+ public void enforceStartDeadlineTest() throws Exception {
+ String flowGroup = "fg";
+ String flowName = "fn";
+ long flowExecutionId = System.currentTimeMillis();
+ MostlyMySqlDagManagementStateStore dagManagementStateStore =
spy(MostlyMySqlDagManagementStateStoreTest.getDummyDMSS(this.testMetastoreDatabase));
+ doNothing().when(dagManagementStateStore).tryAcquireQuota(any());
+ doNothing().when(dagManagementStateStore).addDagNodeState(any(), any());
+ Dag<JobExecutionPlan> dag = DagManagerTest.buildDag("1", flowExecutionId,
DagManager.FailureOption.FINISH_ALL_POSSIBLE.name(),
+ 5, "user5", ConfigFactory.empty()
+ .withValue(ConfigurationKeys.FLOW_GROUP_KEY,
ConfigValueFactory.fromAnyRef(flowGroup))
+ .withValue(ConfigurationKeys.FLOW_NAME_KEY,
ConfigValueFactory.fromAnyRef(flowName))
+ .withValue(ConfigurationKeys.GOBBLIN_JOB_START_SLA_TIME_UNIT,
ConfigValueFactory.fromAnyRef(TimeUnit.MILLISECONDS.name()))
+ .withValue(ConfigurationKeys.GOBBLIN_JOB_START_SLA_TIME,
ConfigValueFactory.fromAnyRef(1L)));
+ JobStatus jobStatus =
JobStatus.builder().flowName(flowName).flowGroup(flowGroup).jobGroup(flowGroup).jobName("job0").flowExecutionId(flowExecutionId).
+ message("Test
message").eventName(ExecutionStatus.ORCHESTRATED.name()).startTime(flowExecutionId).shouldRetry(false).orchestratedTime(flowExecutionId).build();
+ doReturn(Optional.of(dag)).when(dagManagementStateStore).getDag(any());
+ doReturn(Pair.of(Optional.of(dag.getStartNodes().get(0)),
Optional.of(jobStatus))).when(dagManagementStateStore).getDagNodeWithJobStatus(any());
+ this.mockedGobblinServiceManager.when(() ->
GobblinServiceManager.getClass(DagActionStore.class)).thenReturn(mock(DagActionStore.class));
+ dagManagementStateStore.checkpointDag(dag); // simulate having a dag that
has not yet started running
+
+ EnforceStartDeadlineDagProc enforceStartDeadlineDagProc = new
EnforceStartDeadlineDagProc(
+ new EnforceStartDeadlineDagTask(new
DagActionStore.DagAction(flowGroup, flowName, String.valueOf(flowExecutionId),
+ "job0", DagActionStore.DagActionType.ENFORCE_START_DEADLINE),
null, mock(DagActionStore.class)));
+ enforceStartDeadlineDagProc.process(dagManagementStateStore);
+
+ int expectedNumOfDeleteDagNodeStates = 1; // the one dag node
corresponding to the EnforceStartDeadlineDagProc
+ Assert.assertEquals(expectedNumOfDeleteDagNodeStates,
+
Mockito.mockingDetails(dagManagementStateStore).getInvocations().stream()
+ .filter(a ->
a.getMethod().getName().equals("deleteDagNodeState")).count());
+ }
+
+ @Test
+ public void enforceFinishDeadlineTest() throws Exception {
+ String flowGroup = "fg";
+ String flowName = "fn";
+ long flowExecutionId = System.currentTimeMillis();
+ MostlyMySqlDagManagementStateStore dagManagementStateStore =
spy(MostlyMySqlDagManagementStateStoreTest.getDummyDMSS(this.testMetastoreDatabase));
+ doNothing().when(dagManagementStateStore).tryAcquireQuota(any());
+ doNothing().when(dagManagementStateStore).addDagNodeState(any(), any());
+ int numOfDagNodes = 5;
+ Dag<JobExecutionPlan> dag = DagManagerTest.buildDag("1", flowExecutionId,
DagManager.FailureOption.FINISH_ALL_POSSIBLE.name(),
+ numOfDagNodes, "user5", ConfigFactory.empty()
+ .withValue(ConfigurationKeys.FLOW_GROUP_KEY,
ConfigValueFactory.fromAnyRef(flowGroup))
+ .withValue(ConfigurationKeys.FLOW_NAME_KEY,
ConfigValueFactory.fromAnyRef(flowName))
+ .withValue(ConfigurationKeys.GOBBLIN_FLOW_SLA_TIME_UNIT,
ConfigValueFactory.fromAnyRef(TimeUnit.MILLISECONDS.name()))
+ .withValue(ConfigurationKeys.GOBBLIN_FLOW_SLA_TIME,
ConfigValueFactory.fromAnyRef(1L)));
Review Comment:
common code in two tests can we make class var
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java:
##########
@@ -111,4 +122,52 @@ public static void
submitJobToExecutor(DagManagementStateStore dagManagementStat
throw new RuntimeException(e);
}
}
+
+ public static void cancelDagNode(Dag.DagNode<JobExecutionPlan>
dagNodeToCancel, DagManagementStateStore dagManagementStateStore) throws
IOException {
+ Properties props = new Properties();
+ DagManager.DagId dagId = DagManagerUtils.generateDagId(dagNodeToCancel);
+ if
(dagNodeToCancel.getValue().getJobSpec().getConfig().hasPath(ConfigurationKeys.FLOW_EXECUTION_ID_KEY))
{
+ props.setProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY,
+
dagNodeToCancel.getValue().getJobSpec().getConfig().getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY));
+ }
+
+ try {
+ if (dagNodeToCancel.getValue().getJobFuture().isPresent()) {
+ Future future = dagNodeToCancel.getValue().getJobFuture().get();
+ String serializedFuture =
DagManagerUtils.getSpecProducer(dagNodeToCancel).serializeAddSpecResponse(future);
+ props.put(ConfigurationKeys.SPEC_PRODUCER_SERIALIZED_FUTURE,
serializedFuture);
+ sendCancellationEvent(dagNodeToCancel.getValue());
+ } else {
+ log.warn("No Job future when canceling DAG node (hence, not sending
cancellation event) - {}",
+ dagNodeToCancel.getValue().getJobSpec().getUri());
+ }
+
DagManagerUtils.getSpecProducer(dagNodeToCancel).cancelJob(dagNodeToCancel.getValue().getJobSpec().getUri(),
props).get();
+ // todo - why was it not being cleaned up in DagManager?
+ dagManagementStateStore.deleteDagNodeState(dagId, dagNodeToCancel);
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+
+ public static void cancelDag(Dag<JobExecutionPlan> dag,
DagManagementStateStore dagManagementStateStore) throws IOException {
+ List<Dag.DagNode<JobExecutionPlan>> dagNodesToCancel = dag.getNodes();
+ log.info("Found {} DagNodes to cancel (DagId {}).",
dagNodesToCancel.size(), DagManagerUtils.generateDagId(dag));
+
+ for (Dag.DagNode<JobExecutionPlan> dagNodeToCancel : dagNodesToCancel) {
+ DagProcUtils.cancelDagNode(dagNodeToCancel, dagManagementStateStore);
+ }
+ }
+
+ public static void sendCancellationEvent(JobExecutionPlan jobExecutionPlan) {
+ Map<String, String> jobMetadata =
TimingEventUtils.getJobMetadata(Maps.newHashMap(), jobExecutionPlan);
+
DagProc.eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.JOB_CANCEL).stop(jobMetadata);
+ jobExecutionPlan.setExecutionStatus(CANCELLED);
+ }
+
+ private static void
sendEnforceStartDeadlineDagAction(Dag.DagNode<JobExecutionPlan> dagNode)
Review Comment:
do we not have a method for enforceJobFinishDeadlineDagAction?
##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceDeadlineDagProcsTest.java:
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.proc;
+
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
+import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
+import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.modules.core.GobblinServiceManager;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.DagActionStore;
+import org.apache.gobblin.service.modules.orchestration.DagManager;
+import org.apache.gobblin.service.modules.orchestration.DagManagerTest;
+import
org.apache.gobblin.service.modules.orchestration.MostlyMySqlDagManagementStateStore;
+import
org.apache.gobblin.service.modules.orchestration.MostlyMySqlDagManagementStateStoreTest;
+import
org.apache.gobblin.service.modules.orchestration.task.EnforceFinishDeadlineDagTask;
+import
org.apache.gobblin.service.modules.orchestration.task.EnforceStartDeadlineDagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.JobStatus;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+
+
+public class EnforceDeadlineDagProcsTest {
+ private ITestMetastoreDatabase testMetastoreDatabase;
+ private final MockedStatic<GobblinServiceManager>
mockedGobblinServiceManager = Mockito.mockStatic(GobblinServiceManager.class);
+
+ @BeforeClass
+ public void setUp() throws Exception {
+ this.testMetastoreDatabase = TestMetastoreDatabaseFactory.get();
+ }
+
+ @AfterClass(alwaysRun = true)
+ public void tearDown() throws Exception {
+ // `.close()` to avoid (in the aggregate, across multiple suites) -
java.sql.SQLNonTransientConnectionException: Too many connections
+ this.testMetastoreDatabase.close();
+ this.mockedGobblinServiceManager.close();
+ }
+
+ @Test
+ public void enforceStartDeadlineTest() throws Exception {
+ String flowGroup = "fg";
+ String flowName = "fn";
+ long flowExecutionId = System.currentTimeMillis();
+ MostlyMySqlDagManagementStateStore dagManagementStateStore =
spy(MostlyMySqlDagManagementStateStoreTest.getDummyDMSS(this.testMetastoreDatabase));
+ doNothing().when(dagManagementStateStore).tryAcquireQuota(any());
+ doNothing().when(dagManagementStateStore).addDagNodeState(any(), any());
+ Dag<JobExecutionPlan> dag = DagManagerTest.buildDag("1", flowExecutionId,
DagManager.FailureOption.FINISH_ALL_POSSIBLE.name(),
+ 5, "user5", ConfigFactory.empty()
+ .withValue(ConfigurationKeys.FLOW_GROUP_KEY,
ConfigValueFactory.fromAnyRef(flowGroup))
+ .withValue(ConfigurationKeys.FLOW_NAME_KEY,
ConfigValueFactory.fromAnyRef(flowName))
+ .withValue(ConfigurationKeys.GOBBLIN_JOB_START_SLA_TIME_UNIT,
ConfigValueFactory.fromAnyRef(TimeUnit.MILLISECONDS.name()))
+ .withValue(ConfigurationKeys.GOBBLIN_JOB_START_SLA_TIME,
ConfigValueFactory.fromAnyRef(1L)));
+ JobStatus jobStatus =
JobStatus.builder().flowName(flowName).flowGroup(flowGroup).jobGroup(flowGroup).jobName("job0").flowExecutionId(flowExecutionId).
+ message("Test
message").eventName(ExecutionStatus.ORCHESTRATED.name()).startTime(flowExecutionId).shouldRetry(false).orchestratedTime(flowExecutionId).build();
+ doReturn(Optional.of(dag)).when(dagManagementStateStore).getDag(any());
+ doReturn(Pair.of(Optional.of(dag.getStartNodes().get(0)),
Optional.of(jobStatus))).when(dagManagementStateStore).getDagNodeWithJobStatus(any());
+ this.mockedGobblinServiceManager.when(() ->
GobblinServiceManager.getClass(DagActionStore.class)).thenReturn(mock(DagActionStore.class));
+ dagManagementStateStore.checkpointDag(dag); // simulate having a dag that
has not yet started running
+
+ EnforceStartDeadlineDagProc enforceStartDeadlineDagProc = new
EnforceStartDeadlineDagProc(
+ new EnforceStartDeadlineDagTask(new
DagActionStore.DagAction(flowGroup, flowName, String.valueOf(flowExecutionId),
+ "job0", DagActionStore.DagActionType.ENFORCE_START_DEADLINE),
null, mock(DagActionStore.class)));
+ enforceStartDeadlineDagProc.process(dagManagementStateStore);
+
+ int expectedNumOfDeleteDagNodeStates = 1; // the one dag node
corresponding to the EnforceStartDeadlineDagProc
+ Assert.assertEquals(expectedNumOfDeleteDagNodeStates,
+
Mockito.mockingDetails(dagManagementStateStore).getInvocations().stream()
+ .filter(a ->
a.getMethod().getName().equals("deleteDagNodeState")).count());
+ }
+
+ @Test
+ public void enforceFinishDeadlineTest() throws Exception {
+ String flowGroup = "fg";
+ String flowName = "fn";
+ long flowExecutionId = System.currentTimeMillis();
+ MostlyMySqlDagManagementStateStore dagManagementStateStore =
spy(MostlyMySqlDagManagementStateStoreTest.getDummyDMSS(this.testMetastoreDatabase));
+ doNothing().when(dagManagementStateStore).tryAcquireQuota(any());
+ doNothing().when(dagManagementStateStore).addDagNodeState(any(), any());
Review Comment:
can we move this to setup?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java:
##########
@@ -135,6 +153,46 @@ public DagTask next() {
}
}
+ private void createStartDeadlineTrigger(DagActionStore.DagAction dagAction)
+ throws SchedulerException {
+ Pair<Optional<Dag.DagNode<JobExecutionPlan>>, Optional<JobStatus>>
dagNodeToCreateTriggerFor =
+
this.dagManagementStateStore.getDagNodeWithJobStatus(dagAction.getDagNodeId());
+
+ TimeUnit jobStartTimeUnit =
TimeUnit.valueOf(ConfigUtils.getString(this.config,
DagManager.JOB_START_SLA_UNITS,
+ ConfigurationKeys.FALLBACK_GOBBLIN_JOB_START_SLA_TIME_UNIT));
+ long defaultJobStartSlaTimeMillis =
jobStartTimeUnit.toMillis(ConfigUtils.getLong(this.config,
+ DagManager.JOB_START_SLA_TIME,
ConfigurationKeys.FALLBACK_GOBBLIN_JOB_START_SLA_TIME));
+
+ long timeOutForJobStart =
DagManagerUtils.getJobStartSla(dagNodeToCreateTriggerFor.getLeft().get(),
defaultJobStartSlaTimeMillis);
+ long jobOrchestratedTime =
dagNodeToCreateTriggerFor.getRight().get().getOrchestratedTime();
+ long reminderDuration = jobOrchestratedTime + timeOutForJobStart -
System.currentTimeMillis();
Review Comment:
when this fires is that when it should have completed by? let's think about
when we should calculate this time and who should store it? dagAction and/or
task and/or reminder
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java:
##########
@@ -135,6 +153,46 @@ public DagTask next() {
}
}
+ private void createStartDeadlineTrigger(DagActionStore.DagAction dagAction)
+ throws SchedulerException {
+ Pair<Optional<Dag.DagNode<JobExecutionPlan>>, Optional<JobStatus>>
dagNodeToCreateTriggerFor =
+
this.dagManagementStateStore.getDagNodeWithJobStatus(dagAction.getDagNodeId());
+
+ TimeUnit jobStartTimeUnit =
TimeUnit.valueOf(ConfigUtils.getString(this.config,
DagManager.JOB_START_SLA_UNITS,
+ ConfigurationKeys.FALLBACK_GOBBLIN_JOB_START_SLA_TIME_UNIT));
+ long defaultJobStartSlaTimeMillis =
jobStartTimeUnit.toMillis(ConfigUtils.getLong(this.config,
+ DagManager.JOB_START_SLA_TIME,
ConfigurationKeys.FALLBACK_GOBBLIN_JOB_START_SLA_TIME));
+
+ long timeOutForJobStart =
DagManagerUtils.getJobStartSla(dagNodeToCreateTriggerFor.getLeft().get(),
defaultJobStartSlaTimeMillis);
Review Comment:
what does timeout mean the time it should have started by?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceStartDeadlineDagProc.java:
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.proc;
+
+import java.io.IOException;
+import java.util.Optional;
+
+import org.apache.commons.lang3.tuple.Pair;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
+import
org.apache.gobblin.service.modules.orchestration.task.EnforceStartDeadlineDagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+import static org.apache.gobblin.service.ExecutionStatus.ORCHESTRATED;
+import static org.apache.gobblin.service.ExecutionStatus.valueOf;
+
+
+/**
+ * An implementation for {@link DagProc} that marks the {@link Dag} as failed
and cancel the job if it does not start in
+ * {@link
org.apache.gobblin.service.modules.orchestration.DagManager#JOB_START_SLA_TIME}
time.
+ */
+@Slf4j
+public class EnforceStartDeadlineDagProc extends
DagProc<Optional<Dag<JobExecutionPlan>>> {
+
+ public EnforceStartDeadlineDagProc(EnforceStartDeadlineDagTask
enforceStartDeadlineDagTask) {
+ super(enforceStartDeadlineDagTask);
+ }
+
+ @Override
+ protected Optional<Dag<JobExecutionPlan>> initialize(DagManagementStateStore
dagManagementStateStore)
+ throws IOException {
+ return dagManagementStateStore.getDag(getDagId());
+ }
+
+ @Override
+ protected void act(DagManagementStateStore dagManagementStateStore,
Optional<Dag<JobExecutionPlan>> dag)
+ throws IOException {
+ log.info("Request to enforce deadlines for dag {}", getDagId());
+
+ if (!dag.isPresent()) {
+ // todo - add a metric here
+ log.error("Did not find Dag with id {}, it might be already
cancelled/finished and thus cleaned up from the store.",
+ getDagId());
+ return;
+ }
+
+ enforceStartDeadline(dagManagementStateStore, dag);
+ }
+
+ private void enforceStartDeadline(DagManagementStateStore
dagManagementStateStore, Optional<Dag<JobExecutionPlan>> dag)
+ throws IOException {
+ Pair<Optional<Dag.DagNode<JobExecutionPlan>>,
Optional<org.apache.gobblin.service.monitoring.JobStatus>>
+ dagNodeToCheckDeadline =
dagManagementStateStore.getDagNodeWithJobStatus(getDagNodeId());
+ if (!dagNodeToCheckDeadline.getLeft().isPresent()) {
+ // this should never happen; a job for which DEADLINE_ENFORCEMENT dag
action is created must have a dag node in store
Review Comment:
log an error here then
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderScheduler.java:
##########
@@ -96,8 +98,8 @@ public void execute(JobExecutionContext context) {
log.info("DagProc reminder triggered for (flowGroup: " + flowGroup + ",
flowName: " + flowName
+ ", flowExecutionId: " + flowId + ", jobName: " + jobName +")");
- DagActionStore.DagAction dagAction = new
DagActionStore.DagAction(flowGroup, flowName, flowId, jobName,
- dagActionType);
+ DagActionStore.DagAction dagAction = new
DagActionStore.DagAction(flowGroup, flowName, flowId, jobName, dagActionType);
+ dagAction.setReminder(true);
Review Comment:
why not set this in the line above?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java:
##########
@@ -124,9 +135,16 @@ public DagTask next() {
while (true) {
try {
DagActionStore.DagAction dagAction = this.dagActionQueue.take();
- LeaseAttemptStatus leaseAttemptStatus =
retrieveLeaseStatus(dagAction);
- if (leaseAttemptStatus instanceof
LeaseAttemptStatus.LeaseObtainedStatus) {
- return createDagTask(dagAction,
(LeaseAttemptStatus.LeaseObtainedStatus) leaseAttemptStatus);
+ // create triggers for original (non-reminder) dag actions of type
ENFORCE_START_DEADLINE and ENFORCE_FINISH_DEADLINE
Review Comment:
comment logic of why we don't get lease for these deadlines after getting
from queue rather at the time of enforcement so easy to understand
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java:
##########
@@ -111,4 +122,52 @@ public static void
submitJobToExecutor(DagManagementStateStore dagManagementStat
throw new RuntimeException(e);
}
}
+
+ public static void cancelDagNode(Dag.DagNode<JobExecutionPlan>
dagNodeToCancel, DagManagementStateStore dagManagementStateStore) throws
IOException {
+ Properties props = new Properties();
+ DagManager.DagId dagId = DagManagerUtils.generateDagId(dagNodeToCancel);
+ if
(dagNodeToCancel.getValue().getJobSpec().getConfig().hasPath(ConfigurationKeys.FLOW_EXECUTION_ID_KEY))
{
+ props.setProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY,
+
dagNodeToCancel.getValue().getJobSpec().getConfig().getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY));
+ }
+
+ try {
+ if (dagNodeToCancel.getValue().getJobFuture().isPresent()) {
+ Future future = dagNodeToCancel.getValue().getJobFuture().get();
+ String serializedFuture =
DagManagerUtils.getSpecProducer(dagNodeToCancel).serializeAddSpecResponse(future);
+ props.put(ConfigurationKeys.SPEC_PRODUCER_SERIALIZED_FUTURE,
serializedFuture);
+ sendCancellationEvent(dagNodeToCancel.getValue());
+ } else {
+ log.warn("No Job future when canceling DAG node (hence, not sending
cancellation event) - {}",
+ dagNodeToCancel.getValue().getJobSpec().getUri());
Review Comment:
should we emit a metric here? is this a valid case or should it be an error
let's comment more info here
##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceDeadlineDagProcsTest.java:
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.proc;
+
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
+import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
+import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.modules.core.GobblinServiceManager;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.DagActionStore;
+import org.apache.gobblin.service.modules.orchestration.DagManager;
+import org.apache.gobblin.service.modules.orchestration.DagManagerTest;
+import
org.apache.gobblin.service.modules.orchestration.MostlyMySqlDagManagementStateStore;
+import
org.apache.gobblin.service.modules.orchestration.MostlyMySqlDagManagementStateStoreTest;
+import
org.apache.gobblin.service.modules.orchestration.task.EnforceFinishDeadlineDagTask;
+import
org.apache.gobblin.service.modules.orchestration.task.EnforceStartDeadlineDagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.JobStatus;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+
+
+public class EnforceDeadlineDagProcsTest {
+ private ITestMetastoreDatabase testMetastoreDatabase;
+ private final MockedStatic<GobblinServiceManager>
mockedGobblinServiceManager = Mockito.mockStatic(GobblinServiceManager.class);
+
+ @BeforeClass
+ public void setUp() throws Exception {
+ this.testMetastoreDatabase = TestMetastoreDatabaseFactory.get();
+ }
+
+ @AfterClass(alwaysRun = true)
+ public void tearDown() throws Exception {
+ // `.close()` to avoid (in the aggregate, across multiple suites) -
java.sql.SQLNonTransientConnectionException: Too many connections
+ this.testMetastoreDatabase.close();
+ this.mockedGobblinServiceManager.close();
+ }
+
+ @Test
+ public void enforceStartDeadlineTest() throws Exception {
Review Comment:
javadocs to describe what's happening in these tests
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java:
##########
@@ -135,6 +153,46 @@ public DagTask next() {
}
}
+ private void createStartDeadlineTrigger(DagActionStore.DagAction dagAction)
+ throws SchedulerException {
+ Pair<Optional<Dag.DagNode<JobExecutionPlan>>, Optional<JobStatus>>
dagNodeToCreateTriggerFor =
+
this.dagManagementStateStore.getDagNodeWithJobStatus(dagAction.getDagNodeId());
+
+ TimeUnit jobStartTimeUnit =
TimeUnit.valueOf(ConfigUtils.getString(this.config,
DagManager.JOB_START_SLA_UNITS,
+ ConfigurationKeys.FALLBACK_GOBBLIN_JOB_START_SLA_TIME_UNIT));
+ long defaultJobStartSlaTimeMillis =
jobStartTimeUnit.toMillis(ConfigUtils.getLong(this.config,
+ DagManager.JOB_START_SLA_TIME,
ConfigurationKeys.FALLBACK_GOBBLIN_JOB_START_SLA_TIME));
+
+ long timeOutForJobStart =
DagManagerUtils.getJobStartSla(dagNodeToCreateTriggerFor.getLeft().get(),
defaultJobStartSlaTimeMillis);
+ long jobOrchestratedTime =
dagNodeToCreateTriggerFor.getRight().get().getOrchestratedTime();
+ long reminderDuration = jobOrchestratedTime + timeOutForJobStart -
System.currentTimeMillis();
Review Comment:
it feels like dagTask and proc should have an epoch that will contain this
information. we can save it in the reminder jobDataMap if you overload the
method to schedule the reminder
##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceDeadlineDagProcsTest.java:
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.proc;
+
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
+import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
+import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.modules.core.GobblinServiceManager;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.DagActionStore;
+import org.apache.gobblin.service.modules.orchestration.DagManager;
+import org.apache.gobblin.service.modules.orchestration.DagManagerTest;
+import
org.apache.gobblin.service.modules.orchestration.MostlyMySqlDagManagementStateStore;
+import
org.apache.gobblin.service.modules.orchestration.MostlyMySqlDagManagementStateStoreTest;
+import
org.apache.gobblin.service.modules.orchestration.task.EnforceFinishDeadlineDagTask;
+import
org.apache.gobblin.service.modules.orchestration.task.EnforceStartDeadlineDagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.JobStatus;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+
+
+public class EnforceDeadlineDagProcsTest {
+ private ITestMetastoreDatabase testMetastoreDatabase;
+ private final MockedStatic<GobblinServiceManager>
mockedGobblinServiceManager = Mockito.mockStatic(GobblinServiceManager.class);
+
+ @BeforeClass
+ public void setUp() throws Exception {
+ this.testMetastoreDatabase = TestMetastoreDatabaseFactory.get();
+ }
+
+ @AfterClass(alwaysRun = true)
+ public void tearDown() throws Exception {
+ // `.close()` to avoid (in the aggregate, across multiple suites) -
java.sql.SQLNonTransientConnectionException: Too many connections
+ this.testMetastoreDatabase.close();
+ this.mockedGobblinServiceManager.close();
+ }
+
+ @Test
+ public void enforceStartDeadlineTest() throws Exception {
+ String flowGroup = "fg";
+ String flowName = "fn";
+ long flowExecutionId = System.currentTimeMillis();
+ MostlyMySqlDagManagementStateStore dagManagementStateStore =
spy(MostlyMySqlDagManagementStateStoreTest.getDummyDMSS(this.testMetastoreDatabase));
+ doNothing().when(dagManagementStateStore).tryAcquireQuota(any());
+ doNothing().when(dagManagementStateStore).addDagNodeState(any(), any());
+ Dag<JobExecutionPlan> dag = DagManagerTest.buildDag("1", flowExecutionId,
DagManager.FailureOption.FINISH_ALL_POSSIBLE.name(),
+ 5, "user5", ConfigFactory.empty()
+ .withValue(ConfigurationKeys.FLOW_GROUP_KEY,
ConfigValueFactory.fromAnyRef(flowGroup))
+ .withValue(ConfigurationKeys.FLOW_NAME_KEY,
ConfigValueFactory.fromAnyRef(flowName))
+ .withValue(ConfigurationKeys.GOBBLIN_JOB_START_SLA_TIME_UNIT,
ConfigValueFactory.fromAnyRef(TimeUnit.MILLISECONDS.name()))
+ .withValue(ConfigurationKeys.GOBBLIN_JOB_START_SLA_TIME,
ConfigValueFactory.fromAnyRef(1L)));
+ JobStatus jobStatus =
JobStatus.builder().flowName(flowName).flowGroup(flowGroup).jobGroup(flowGroup).jobName("job0").flowExecutionId(flowExecutionId).
+ message("Test
message").eventName(ExecutionStatus.ORCHESTRATED.name()).startTime(flowExecutionId).shouldRetry(false).orchestratedTime(flowExecutionId).build();
+ doReturn(Optional.of(dag)).when(dagManagementStateStore).getDag(any());
+ doReturn(Pair.of(Optional.of(dag.getStartNodes().get(0)),
Optional.of(jobStatus))).when(dagManagementStateStore).getDagNodeWithJobStatus(any());
+ this.mockedGobblinServiceManager.when(() ->
GobblinServiceManager.getClass(DagActionStore.class)).thenReturn(mock(DagActionStore.class));
+ dagManagementStateStore.checkpointDag(dag); // simulate having a dag that
has not yet started running
+
+ EnforceStartDeadlineDagProc enforceStartDeadlineDagProc = new
EnforceStartDeadlineDagProc(
+ new EnforceStartDeadlineDagTask(new
DagActionStore.DagAction(flowGroup, flowName, String.valueOf(flowExecutionId),
+ "job0", DagActionStore.DagActionType.ENFORCE_START_DEADLINE),
null, mock(DagActionStore.class)));
+ enforceStartDeadlineDagProc.process(dagManagementStateStore);
+
+ int expectedNumOfDeleteDagNodeStates = 1; // the one dag node
corresponding to the EnforceStartDeadlineDagProc
+ Assert.assertEquals(expectedNumOfDeleteDagNodeStates,
+
Mockito.mockingDetails(dagManagementStateStore).getInvocations().stream()
+ .filter(a ->
a.getMethod().getName().equals("deleteDagNodeState")).count());
+ }
+
+ @Test
+ public void enforceFinishDeadlineTest() throws Exception {
+ String flowGroup = "fg";
+ String flowName = "fn";
+ long flowExecutionId = System.currentTimeMillis();
Review Comment:
global variables?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceStartDeadlineDagProc.java:
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.proc;
+
+import java.io.IOException;
+import java.util.Optional;
+
+import org.apache.commons.lang3.tuple.Pair;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
+import
org.apache.gobblin.service.modules.orchestration.task.EnforceStartDeadlineDagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+import static org.apache.gobblin.service.ExecutionStatus.ORCHESTRATED;
+import static org.apache.gobblin.service.ExecutionStatus.valueOf;
+
+
+/**
+ * An implementation for {@link DagProc} that marks the {@link Dag} as failed
and cancel the job if it does not start in
+ * {@link
org.apache.gobblin.service.modules.orchestration.DagManager#JOB_START_SLA_TIME}
time.
+ */
+@Slf4j
+public class EnforceStartDeadlineDagProc extends
DagProc<Optional<Dag<JobExecutionPlan>>> {
+
+ public EnforceStartDeadlineDagProc(EnforceStartDeadlineDagTask
enforceStartDeadlineDagTask) {
+ super(enforceStartDeadlineDagTask);
+ }
+
+ @Override
+ protected Optional<Dag<JobExecutionPlan>> initialize(DagManagementStateStore
dagManagementStateStore)
+ throws IOException {
+ return dagManagementStateStore.getDag(getDagId());
+ }
+
+ @Override
+ protected void act(DagManagementStateStore dagManagementStateStore,
Optional<Dag<JobExecutionPlan>> dag)
+ throws IOException {
+ log.info("Request to enforce deadlines for dag {}", getDagId());
+
+ if (!dag.isPresent()) {
+ // todo - add a metric here
+ log.error("Did not find Dag with id {}, it might be already
cancelled/finished and thus cleaned up from the store.",
+ getDagId());
+ return;
+ }
+
+ enforceStartDeadline(dagManagementStateStore, dag);
+ }
+
+ private void enforceStartDeadline(DagManagementStateStore
dagManagementStateStore, Optional<Dag<JobExecutionPlan>> dag)
+ throws IOException {
+ Pair<Optional<Dag.DagNode<JobExecutionPlan>>,
Optional<org.apache.gobblin.service.monitoring.JobStatus>>
+ dagNodeToCheckDeadline =
dagManagementStateStore.getDagNodeWithJobStatus(getDagNodeId());
+ if (!dagNodeToCheckDeadline.getLeft().isPresent()) {
+ // this should never happen; a job for which DEADLINE_ENFORCEMENT dag
action is created must have a dag node in store
+ return;
+ }
+
+ Dag.DagNode<JobExecutionPlan> dagNode =
dagNodeToCheckDeadline.getLeft().get();
+ long timeOutForJobStart = DagManagerUtils.getJobStartSla(dagNode,
1000000L);//, this.defaultJobStartSlaTimeMillis);
Review Comment:
replace with default?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]