phet commented on code in PR #3965:
URL: https://github.com/apache/gobblin/pull/3965#discussion_r1631651319
##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java:
##########
@@ -82,41 +86,66 @@ public void tearDown() throws Exception {
}
@Test
- public void launchDag()
- throws IOException, InterruptedException, URISyntaxException {
- Dag<JobExecutionPlan> dag = DagManagerTest.buildDag("1",
System.currentTimeMillis(), DagManager.FailureOption.FINISH_ALL_POSSIBLE.name(),
- 5, "user5",
ConfigFactory.empty().withValue(ConfigurationKeys.FLOW_GROUP_KEY,
ConfigValueFactory.fromAnyRef("fg")));
+ public void launchDag() throws IOException, InterruptedException,
URISyntaxException, ExecutionException {
+ String flowGroup = "fg";
+ String flowName = "fn";
+ String flowExecutionId = "12345";
+ Dag<JobExecutionPlan> dag = DagManagerTest.buildDag("1",
Long.parseLong(flowExecutionId),
+ DagManager.FailureOption.FINISH_ALL_POSSIBLE.name(), 5, "user5",
ConfigFactory.empty()
+ .withValue(ConfigurationKeys.FLOW_GROUP_KEY,
ConfigValueFactory.fromAnyRef(flowGroup))
+ .withValue(ConfigurationKeys.FLOW_NAME_KEY,
ConfigValueFactory.fromAnyRef(flowName)));
FlowCompilationValidationHelper flowCompilationValidationHelper =
mock(FlowCompilationValidationHelper.class);
doReturn(com.google.common.base.Optional.of(dag)).when(flowCompilationValidationHelper).createExecutionPlanIfValid(any());
+ SpecProducer<Spec> specProducer =
DagManagerUtils.getSpecProducer(dag.getNodes().get(0));
Review Comment:
to dot-I's and cross-T's I'd suggest to first assert there's only one
specProducer
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java:
##########
@@ -55,6 +56,37 @@
*/
@Slf4j
public class DagProcUtils {
+
+ /**
+ * If there is a single job to run next, it runs it. If there are multiple
jobs to run, it creates a
+ * {@link
org.apache.gobblin.service.modules.orchestration.DagActionStore.DagActionType#REEVALUATE}
dag action for
+ * each of them and those jobs will be launched in respective Reevaluate dag
proc.
Review Comment:
`{@link ReevaluateDagProc}`
##########
gobblin-metastore/src/test/java/org/apache/gobblin/metastore/testing/TestMetastoreDatabaseServer.java:
##########
@@ -97,7 +97,7 @@ class TestMetastoreDatabaseServer implements Closeable {
.withUser(this.dbUserName, this.dbUserPassword)
.withServerVariable("explicit_defaults_for_timestamp", "off")
// default `max_connections` is apparently 151 - see:
https://dev.mysql.com/doc/refman/8.0/en/server-system-variables.html#sysvar_max_connections
- .withServerVariable("max_connections", "501")
+ .withServerVariable("max_connections", "2000")
Review Comment:
hopefully this helps! ... I didn't have a lot of luck setting this myself
(to avoid the "too many conns" unit test failures elsewhere)
##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java:
##########
@@ -141,4 +170,14 @@ public static Dag<JobExecutionPlan>
buildDagWithMultipleNodesAtDifferentLevels(S
}
return new JobExecutionPlanDagFactory().createDag(jobExecutionPlans);
}
+
+ public static List<SpecProducer<Spec>>
getDagSpecProducers(Dag<JobExecutionPlan> dag) {
Review Comment:
interesting this is only called from `ReevaluateDagProcTest`. does it
belong there?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java:
##########
@@ -55,6 +56,37 @@
*/
@Slf4j
public class DagProcUtils {
+
+ /**
+ * If there is a single job to run next, it runs it. If there are multiple
jobs to run, it creates a
+ * {@link
org.apache.gobblin.service.modules.orchestration.DagActionStore.DagActionType#REEVALUATE}
dag action for
+ * each of them and those jobs will be launched in respective Reevaluate dag
proc.
+ */
+ public static void submitNextNodes(DagManagementStateStore
dagManagementStateStore, Dag<JobExecutionPlan> dag,
+ DagManager.DagId dagId) throws IOException {
+ Set<Dag.DagNode<JobExecutionPlan>> nextNodes =
DagManagerUtils.getNext(dag);
+
+ if (nextNodes.size() > 1) {
+ handleMultipleJobs(dagManagementStateStore, nextNodes);
+ return;
+ }
+
+ //Submit jobs from the dag ready for execution.
+ for (Dag.DagNode<JobExecutionPlan> dagNode : nextNodes) {
+ DagProcUtils.submitJobToExecutor(dagManagementStateStore, dagNode,
dagId);
+ log.info("Submitted job {} for dagId {}",
DagManagerUtils.getJobName(dagNode), dagId);
+ }
Review Comment:
what's your take? IMO, this feels clear and direct:
```
if (nextNodes.size() == 1) {
submitJob(dmss, nextNodes.get(0), dagId)
log.info(...);
} else {
for (Dag.DagNode<JobExecutionPlan> dagNode : nextNodes) {
dmss.addJobDagAction(...);
}
}
```
so only one loop, when it's actually needed, and no separate `private
static` method merely to perform looping and call only one method
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java:
##########
@@ -90,9 +70,28 @@ protected void act(DagManagementStateStore
dagManagementStateStore, Pair<Optiona
}
Dag.DagNode<JobExecutionPlan> dagNode =
dagNodeWithJobStatus.getLeft().get();
- JobStatus jobStatus = dagNodeWithJobStatus.getRight().get();
- ExecutionStatus executionStatus = dagNode.getValue().getExecutionStatus();
+
+ if (!dagNodeWithJobStatus.getRight().isPresent()) {
+ // Usually reevaluate dag action is created by JobStatusMonitor when a
finished job status is available,
+ // but when reevaluate/resume/launch dag proc found multiple parallel
jobs to run next, it creates reevaluate
+ // dag actions for each of those parallel job and in this scenario there
is no job status available.
+ // If the job status is not present, this job was never launched, submit
it now.
+ submitJobForThisDagNode(dagManagementStateStore, dagNode);
+ return;
+ }
+
+ if
(!FlowStatusGenerator.FINISHED_STATUSES.contains(dagNodeWithJobStatus.getRight().get().getEventName()))
{
+ // this may happen if adding job status in the store failed after adding
a ReevaluateDagAction in KafkaJobStatusMonitor
+ throw new RuntimeException(String.format("Job status for dagNode %s is
%s. Re-evaluate dag action are created for"
+ + " new jobs with no job status when there are multiple of them
to run next; or when a job finishes with status - %s",
+ dagNodeId, dagNodeWithJobStatus.getRight().get().getEventName(),
FlowStatusGenerator.FINISHED_STATUSES));
+ }
+
Dag<JobExecutionPlan> dag =
dagManagementStateStore.getDag(getDagId()).get();
+ JobStatus jobStatus = dagNodeWithJobStatus.getRight().get();
+ ExecutionStatus executionStatus =
ExecutionStatus.valueOf(jobStatus.getEventName());
Review Comment:
probably these should be introduced before L83
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java:
##########
@@ -123,6 +122,11 @@ protected void act(DagManagementStateStore
dagManagementStateStore, Pair<Optiona
}
}
+ private void submitJobForThisDagNode(DagManagementStateStore
dagManagementStateStore, Dag.DagNode<JobExecutionPlan> dagNode) {
+ DagProcUtils.submitJobToExecutor(dagManagementStateStore, dagNode,
getDagId());
+ log.info("Submitted job {} for dagId {}",
DagManagerUtils.getJobName(dagNode), getDagId());
+ }
Review Comment:
logging like this could arguably go in `DagProcUtils.submitJobToExecutor`.
if so this class could call that directly, rather than wrapping the call in
this method that merely forwards
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java:
##########
@@ -60,21 +59,19 @@ protected Pair<Optional<Dag.DagNode<JobExecutionPlan>>,
Optional<JobStatus>> ini
Pair<Optional<Dag.DagNode<JobExecutionPlan>>, Optional<JobStatus>>
dagNodeWithJobStatus =
dagManagementStateStore.getDagNodeWithJobStatus(this.dagNodeId);
- if (!dagNodeWithJobStatus.getLeft().isPresent() ||
!dagNodeWithJobStatus.getRight().isPresent()) {
+ if (!dagNodeWithJobStatus.getLeft().isPresent()) {
// this is possible when MALA malfunctions and a duplicated reevaluate
dag proc is launched for a dag node that is
// already "reevaluated" and cleaned up.
return ImmutablePair.of(Optional.empty(), Optional.empty());
}
- ExecutionStatus executionStatus =
ExecutionStatus.valueOf(dagNodeWithJobStatus.getRight().get().getEventName());
- if
(!FlowStatusGenerator.FINISHED_STATUSES.contains(executionStatus.name())) {
- log.warn("Job status for dagNode {} is {}. Re-evaluate dag action should
have been created only for finished status - {}",
- dagNodeId, executionStatus, FlowStatusGenerator.FINISHED_STATUSES);
- // this may happen if adding job status in the store failed after adding
a ReevaluateDagAction in KafkaJobStatusMonitor
- throw new RuntimeException(String.format("Job status %s is not final for
job %s", executionStatus, getDagId()));
+ if (dagNodeWithJobStatus.getRight().isPresent()) {
Review Comment:
in the multi-job case of reevaluate wouldn't there be job status present?
isn't it only in the multi-job launch and resume cases?
##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProcTest.java:
##########
@@ -206,4 +192,80 @@ public void testNoNextJobToRun() throws Exception {
Assert.assertEquals(Mockito.mockingDetails(this.dagManagementStateStore).getInvocations().stream()
.filter(a ->
a.getMethod().getName().equals("deleteDagAction")).count(), 1);
}
+
+ @Test
+ public void testCurrentJobToRun() throws Exception {
+ String flowName = "fn3";
+ Dag<JobExecutionPlan> dag = DagManagerTest.buildDag("1", flowExecutionId,
DagManager.FailureOption.FINISH_ALL_POSSIBLE.name(),
+ 2, "user5", ConfigFactory.empty()
+ .withValue(ConfigurationKeys.FLOW_GROUP_KEY,
ConfigValueFactory.fromAnyRef(flowGroup))
+ .withValue(ConfigurationKeys.FLOW_NAME_KEY,
ConfigValueFactory.fromAnyRef(flowName))
+ .withValue(ConfigurationKeys.JOB_GROUP_KEY,
ConfigValueFactory.fromAnyRef(flowGroup))
+ );
+ List<Dag.DagNode<JobExecutionPlan>> startDagNodes = dag.getStartNodes();
+ List<SpecProducer<Spec>> specProducers =
LaunchDagProcTest.getDagSpecProducers(dag);
+
+ doReturn(Optional.of(dag)).when(dagManagementStateStore).getDag(any());
+ doReturn(new ImmutablePair<>(Optional.of(startDagNodes.get(0)),
Optional.empty()))
+ .when(dagManagementStateStore).getDagNodeWithJobStatus(any());
+
+ ReevaluateDagProc
+ reEvaluateDagProc = new ReevaluateDagProc(new ReevaluateDagTask(new
DagActionStore.DagAction(flowGroup, flowName,
+ String.valueOf(flowExecutionId), "job0",
DagActionStore.DagActionType.REEVALUATE), null,
+ dagManagementStateStore));
+ reEvaluateDagProc.process(dagManagementStateStore);
+
+ long addSpecCount = specProducers.stream()
+ .mapToLong(p -> Mockito.mockingDetails(p)
+ .getInvocations()
+ .stream()
+ .filter(a -> a.getMethod().getName().equals("addSpec"))
+ .count())
+ .sum();
+
+ int numOfLaunchedJobs = 1; // only the current job
+ // only the current job should have run
+ Mockito.verify(specProducers.get(0), Mockito.times(1)).addSpec(any());
+ Assert.assertEquals(numOfLaunchedJobs, addSpecCount);
+
+ // no job's state is deleted because that happens when the job finishes
triggered the reevaluate dag proc
+ Mockito.verify(dagManagementStateStore,
Mockito.never()).deleteDagNodeState(any(), any());
+ Mockito.verify(dagManagementStateStore,
Mockito.never()).deleteDagAction(any());
+ Mockito.verify(dagActionReminderScheduler,
Mockito.never()).unscheduleReminderJob(any());
+ }
+
+ @Test
+ public void testMultipleNextJobToRun() throws Exception {
+ String flowName = "fn4";
+ Dag<JobExecutionPlan> dag =
LaunchDagProcTest.buildDagWithMultipleNodesAtDifferentLevels("1",
String.valueOf(flowExecutionId),
+ DagManager.FailureOption.FINISH_ALL_POSSIBLE.name(), "user5",
ConfigFactory.empty()
+ .withValue(ConfigurationKeys.FLOW_GROUP_KEY,
ConfigValueFactory.fromAnyRef(flowGroup))
+ .withValue(ConfigurationKeys.FLOW_NAME_KEY,
ConfigValueFactory.fromAnyRef(flowName))
+ .withValue(ConfigurationKeys.JOB_GROUP_KEY,
ConfigValueFactory.fromAnyRef(flowGroup))
+ );
+ JobStatus jobStatus =
JobStatus.builder().flowName(flowName).flowGroup(flowGroup).jobGroup(flowGroup).jobName("job3").flowExecutionId(flowExecutionId).
+ message("Test
message").eventName(ExecutionStatus.COMPLETE.name()).startTime(flowExecutionId).shouldRetry(false).orchestratedTime(flowExecutionId).build();
+
+ doReturn(Optional.of(dag)).when(dagManagementStateStore).getDag(any());
+ doReturn(new ImmutablePair<>(Optional.of(dag.getStartNodes().get(0)),
Optional.of(jobStatus))).when(dagManagementStateStore).getDagNodeWithJobStatus(any());
+
+ // mocked job status for the first four jobs
+
dag.getNodes().get(0).getValue().setExecutionStatus(ExecutionStatus.COMPLETE);
+
dag.getNodes().get(1).getValue().setExecutionStatus(ExecutionStatus.COMPLETE);
+
dag.getNodes().get(2).getValue().setExecutionStatus(ExecutionStatus.COMPLETE);
+
dag.getNodes().get(3).getValue().setExecutionStatus(ExecutionStatus.COMPLETE);
+
+ doReturn(Optional.of(dag)).when(dagManagementStateStore).getDag(any());
+
+ ReevaluateDagProc
+ reEvaluateDagProc = new ReevaluateDagProc(new ReevaluateDagTask(new
DagActionStore.DagAction(flowGroup, flowName,
+ String.valueOf(flowExecutionId), "job3",
DagActionStore.DagActionType.REEVALUATE), null, dagManagementStateStore));
+ // process 4th job
+ reEvaluateDagProc.process(dagManagementStateStore);
+
+ int numOfLaunchedJobs = 2; // = number of jobs that should launch when 4th
job passes, i.e. 5th and 6th job
+ // parallel jobs are launched through reevaluate dag action
+ Mockito.verify(dagManagementStateStore, Mockito.times(numOfLaunchedJobs))
Review Comment:
let's validate specProducers here too. given this is a repeating trope of
counting `DagAction`s emitted and specProducer `addSpec` calls, consider
developing an abstraction around verifying the "side-effects" of
`DagProc.process`ing
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java:
##########
@@ -134,6 +154,7 @@ private void setStatus(DagManagementStateStore
dagManagementStateStore,
for (Dag.DagNode<JobExecutionPlan> node : dag.getNodes()) {
if (node.getValue().getId().equals(dagNodeId)) {
node.getValue().setExecutionStatus(executionStatus);
+ dagManagementStateStore.addDagNodeState(node, getDagId());
Review Comment:
good insight. where do you want to document as a code-level `TODO`?
##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java:
##########
@@ -94,29 +93,50 @@ public void launchDag()
flowCompilationValidationHelper);
launchDagProc.process(this.dagManagementStateStore);
- int expectedNumOfSavingDagNodeStates = 1; // = number of start nodes
- Assert.assertEquals(expectedNumOfSavingDagNodeStates,
+ int numOfLaunchedJobs = 1; // = number of start nodes
+ Assert.assertEquals(numOfLaunchedJobs,
Mockito.mockingDetails(this.dagManagementStateStore).getInvocations().stream()
.filter(a ->
a.getMethod().getName().equals("addDagNodeState")).count());
- Mockito.verify(this.dagManagementStateStore, Mockito.times(1))
+ Mockito.verify(this.dagManagementStateStore,
Mockito.times(numOfLaunchedJobs))
.addFlowDagAction(any(), any(), any(),
eq(DagActionStore.DagActionType.ENFORCE_FLOW_FINISH_DEADLINE));
}
+ @Test
+ public void launchDagWithMultipleParallelJobs() throws IOException,
InterruptedException, URISyntaxException {
Review Comment:
I know that. the reason I'm urging you to add it is that specProducer
validation (that never called for all of them) would have caught the bug
[here](https://github.com/apache/gobblin/pull/3965#discussion_r1631547273)
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java:
##########
@@ -89,11 +86,29 @@ protected void act(DagManagementStateStore
dagManagementStateStore, Pair<Optiona
return;
}
+ if (dagNodeWithJobStatus.getRight().isPresent()
+ &&
!FlowStatusGenerator.FINISHED_STATUSES.contains(dagNodeWithJobStatus.getRight().get().getEventName()))
{
+ // this may happen if adding job status in the store failed after adding
a ReevaluateDagAction in KafkaJobStatusMonitor
+ throw new RuntimeException(String.format("Job status for dagNode %s is
%s. Re-evaluate dag action are created for"
+ + " new jobs with no job status when there are multiple of them
to run next; or when a job finishes with status - %s",
+ dagNodeId, dagNodeWithJobStatus.getRight().get().getEventName(),
FlowStatusGenerator.FINISHED_STATUSES));
+ }
+
Dag.DagNode<JobExecutionPlan> dagNode =
dagNodeWithJobStatus.getLeft().get();
+
+ if (!dagNodeWithJobStatus.getRight().isPresent()) {
+ // if the job status is not present, this job was never launched, submit
it now
+ submitJobForThisDagNode(dagManagementStateStore, dagNode);
+ return;
+ }
Review Comment:
OK, that sounds reasonable. let's document the crux of this very clearly
stated rationale somewhere in source code... perhaps in this new `DagProcUtils`
method?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java:
##########
@@ -57,25 +55,7 @@ public ReevaluateDagProc(ReevaluateDagTask
reEvaluateDagTask) {
@Override
protected Pair<Optional<Dag.DagNode<JobExecutionPlan>>, Optional<JobStatus>>
initialize(DagManagementStateStore dagManagementStateStore)
throws IOException {
- Pair<Optional<Dag.DagNode<JobExecutionPlan>>, Optional<JobStatus>>
dagNodeWithJobStatus =
- dagManagementStateStore.getDagNodeWithJobStatus(this.dagNodeId);
-
- if (!dagNodeWithJobStatus.getLeft().isPresent() ||
!dagNodeWithJobStatus.getRight().isPresent()) {
- // this is possible when MALA malfunctions and a duplicated reevaluate
dag proc is launched for a dag node that is
- // already "reevaluated" and cleaned up.
- return ImmutablePair.of(Optional.empty(), Optional.empty());
- }
Review Comment:
don't we still need this check for `if (!dnwjs.getLeft().isPresent())` (to
return empty)?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]