phet commented on a change in pull request #3439:
URL: https://github.com/apache/gobblin/pull/3439#discussion_r766454204
##########
File path:
gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
##########
@@ -65,6 +65,7 @@
private final SpecExecutor specExecutor;
private ExecutionStatus executionStatus = ExecutionStatus.PENDING;
private final int maxAttempts;
+ private int currentGeneration = 1;
private int currentAttempts = 0;
Review comment:
above, it looked like "1" is the default/fallback for both generation
and attempts, but here and below it seems that attempts begins at 0, but
generation begins as 1.
1. why a different starting value for each?
2. for whatever starting value we use, is there a discrepancy between the
`getProp` fallback and here?
##########
File path:
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/ClusterEventMetadataGenerator.java
##########
@@ -43,17 +43,19 @@
List<TaskState> taskStates = jobContext.getJobState().getTaskStates();
String taskException =
EventMetadataUtils.getTaskFailureExceptions(taskStates);
String jobException =
EventMetadataUtils.getJobFailureExceptions(jobContext.getJobState());
-
+ ImmutableMap.Builder<String, String> metadataBuilder =
ImmutableMap.builder();
+ metadataBuilder.put(TimingEvent.FlowEventConstants.HIGH_WATERMARK_FIELD,
jobContext.getJobState().getProp(TimingEvent.FlowEventConstants.HIGH_WATERMARK_FIELD,
""));
Review comment:
I recall seeing you add the watermarks recently... I thought that was a
stand-alone change you already merged in, wasn't it?
or are you now extending that PR with this other fix here? in general, I'd
encourage us to keep separate changes separate (so two PRs in this case).
##########
File path:
gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
##########
@@ -89,6 +89,8 @@
public static final String PROCESSED_COUNT_FIELD = "processedCount";
public static final String MAX_ATTEMPTS_FIELD = "maxAttempts";
public static final String CURRENT_ATTEMPTS_FIELD = "currentAttempts";
+ //This state should always move forward
Review comment:
definitely true... from here want to refer the maintainer to the
description in
`KafkaJobStatusMonitor.addJobStatusToStateStore`?
##########
File path:
gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/JobStatusRetrieverTest.java
##########
@@ -97,6 +99,58 @@ protected void addFlowIdJobStatusToStateStore(String
flowGroup, String flowName,
KafkaJobStatusMonitor.addJobStatusToStateStore(jobStatus,
this.jobStatusRetriever.getStateStore());
}
+ @Test (dependsOnMethods = "testGetLatestExecutionIdsForFlow")
+ public void testOutOfOrderJobTimingEventsForRetryingJob() throws IOException
{
+ long flowExecutionId = 1240L;
+ Properties properties = new Properties();
+
properties.setProperty(TimingEvent.FlowEventConstants.CURRENT_ATTEMPTS_FIELD,
"0");
+
properties.setProperty(TimingEvent.FlowEventConstants.CURRENT_GENERATION_FIELD,
"1");
+ properties.setProperty(TimingEvent.FlowEventConstants.SHOULD_RETRY_FIELD,
"false");
+ addJobStatusToStateStore(flowExecutionId, MY_JOB_NAME_1,
ExecutionStatus.RUNNING.name(), JOB_START_TIME, JOB_START_TIME, properties);
+ addJobStatusToStateStore(flowExecutionId, MY_JOB_NAME_1,
ExecutionStatus.ORCHESTRATED.name(), JOB_ORCHESTRATED_TIME,
JOB_ORCHESTRATED_TIME, properties);
+ addJobStatusToStateStore(flowExecutionId, MY_JOB_NAME_1,
ExecutionStatus.FAILED.name(), 0, 0, properties);
+ Iterator<JobStatus>
+ jobStatusIterator =
this.jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP,
flowExecutionId);
+ JobStatus jobStatus = jobStatusIterator.next();
+ if (jobStatus.getJobName().equals(JobStatusRetriever.NA_KEY)) {
+ jobStatus = jobStatusIterator.next();
+ }
+ Assert.assertEquals(jobStatus.getEventName(),
ExecutionStatus.PENDING_RETRY.name());
+ Assert.assertEquals(jobStatus.isShouldRetry(), true);
+ properties = new Properties();
+
properties.setProperty(TimingEvent.FlowEventConstants.CURRENT_GENERATION_FIELD,
"1");
+
properties.setProperty(TimingEvent.FlowEventConstants.CURRENT_ATTEMPTS_FIELD,
"1");
+ properties.setProperty(TimingEvent.FlowEventConstants.SHOULD_RETRY_FIELD,
"false");
Review comment:
because these are long and repeated several times, maybe a helper like:
```
static Properties createAttemptsProperties(int currGen, int currAttempts,
boolean shouldRetry) {
...
}
```
?
##########
File path:
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
##########
@@ -253,14 +253,22 @@ static void
addJobStatusToStateStore(org.apache.gobblin.configuration.State jobS
List<org.apache.gobblin.configuration.State> states =
stateStore.getAll(storeName, tableName);
if (states.size() > 0) {
- String previousStatus = states.get(states.size() -
1).getProp(JobStatusRetriever.EVENT_NAME_FIELD);
+ org.apache.gobblin.configuration.State previousJobStatus =
states.get(states.size() - 1);
+ String previousStatus =
previousJobStatus.getProp(JobStatusRetriever.EVENT_NAME_FIELD);
String currentStatus =
jobStatus.getProp(JobStatusRetriever.EVENT_NAME_FIELD);
-
- // PENDING_RESUME is allowed to override, because it happens when a flow
is being resumed from previously being failed
- if (previousStatus != null && currentStatus != null &&
!currentStatus.equals(ExecutionStatus.PENDING_RESUME.name())
- &&
ORDERED_EXECUTION_STATUSES.indexOf(ExecutionStatus.valueOf(currentStatus)) <
ORDERED_EXECUTION_STATUSES.indexOf(ExecutionStatus.valueOf(previousStatus))) {
- log.warn(String.format("Received status %s when status is already %s
for flow (%s, %s, %s), job (%s, %s)",
- currentStatus, previousStatus, flowGroup, flowName,
flowExecutionId, jobGroup, jobName));
+ int previousGeneration =
previousJobStatus.getPropAsInt(TimingEvent.FlowEventConstants.CURRENT_GENERATION_FIELD,
1);
+ int currentGeneration =
jobStatus.getPropAsInt(TimingEvent.FlowEventConstants.CURRENT_GENERATION_FIELD,
1);
+ int previousAttempts =
previousJobStatus.getPropAsInt(TimingEvent.FlowEventConstants.CURRENT_ATTEMPTS_FIELD,
1);
+ int currentAttempts =
jobStatus.getPropAsInt(TimingEvent.FlowEventConstants.CURRENT_ATTEMPTS_FIELD,
1);
+ // We use three things to determine the order, generation means how many
times job has been resumed, attempts means during this generation, how many
times job has been retried
+ // And job status reflect the execution status in one attempt
+ if (previousStatus != null && currentStatus != null &&
+ (previousGeneration > currentGeneration
+ || (previousGeneration == currentGeneration && previousAttempts
> currentAttempts)
+ || (previousGeneration == currentGeneration && previousAttempts
== currentAttempts
+ &&
ORDERED_EXECUTION_STATUSES.indexOf(ExecutionStatus.valueOf(currentStatus)) <
ORDERED_EXECUTION_STATUSES.indexOf(ExecutionStatus.valueOf(previousStatus))))){
+ log.warn(String.format("Received status %s generation %s attempts %s
when status is already %s generation %s attempts %s for flow (%s, %s, %s), job
(%s, %s)",
Review comment:
just personal preference, but I would put the 'coordinates' all
together, rather than mixed in with their descriptions, so:
```
"Received status [generation.attempts] = %s [%s.%s] when already %s [%s.%s]
for flow (%s, %s, %s), job (%s, %s)", ...
```
this makes it easier for visually scanning (and shortens the text entry to
search for an expected value, such as "already RUNNING [2.1]" (vs. "already
RUNNING generation 2 attempts 1")
##########
File path:
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
##########
@@ -253,14 +253,22 @@ static void
addJobStatusToStateStore(org.apache.gobblin.configuration.State jobS
List<org.apache.gobblin.configuration.State> states =
stateStore.getAll(storeName, tableName);
if (states.size() > 0) {
- String previousStatus = states.get(states.size() -
1).getProp(JobStatusRetriever.EVENT_NAME_FIELD);
+ org.apache.gobblin.configuration.State previousJobStatus =
states.get(states.size() - 1);
+ String previousStatus =
previousJobStatus.getProp(JobStatusRetriever.EVENT_NAME_FIELD);
String currentStatus =
jobStatus.getProp(JobStatusRetriever.EVENT_NAME_FIELD);
-
- // PENDING_RESUME is allowed to override, because it happens when a flow
is being resumed from previously being failed
- if (previousStatus != null && currentStatus != null &&
!currentStatus.equals(ExecutionStatus.PENDING_RESUME.name())
- &&
ORDERED_EXECUTION_STATUSES.indexOf(ExecutionStatus.valueOf(currentStatus)) <
ORDERED_EXECUTION_STATUSES.indexOf(ExecutionStatus.valueOf(previousStatus))) {
- log.warn(String.format("Received status %s when status is already %s
for flow (%s, %s, %s), job (%s, %s)",
- currentStatus, previousStatus, flowGroup, flowName,
flowExecutionId, jobGroup, jobName));
+ int previousGeneration =
previousJobStatus.getPropAsInt(TimingEvent.FlowEventConstants.CURRENT_GENERATION_FIELD,
1);
+ int currentGeneration =
jobStatus.getPropAsInt(TimingEvent.FlowEventConstants.CURRENT_GENERATION_FIELD,
1);
+ int previousAttempts =
previousJobStatus.getPropAsInt(TimingEvent.FlowEventConstants.CURRENT_ATTEMPTS_FIELD,
1);
+ int currentAttempts =
jobStatus.getPropAsInt(TimingEvent.FlowEventConstants.CURRENT_ATTEMPTS_FIELD,
1);
+ // We use three things to determine the order, generation means how many
times job has been resumed, attempts means during this generation, how many
times job has been retried
Review comment:
minor, but it seems not so much "to determine the order", but "to
accurately count and thereby bound retries, even amidst out-of-order events (by
skipping late arrivals). generation tracks..."
then later explain, "the generation is monotonically increasing, while the
attempts may re-initialize back to 0. this two-part form prevents the
composite value from ever repeating." (or something similar...)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]