[ 
https://issues.apache.org/jira/browse/GOBBLIN-1634?focusedWorklogId=763170&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-763170
 ]

ASF GitHub Bot logged work on GOBBLIN-1634:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 27/Apr/22 21:58
            Start Date: 27/Apr/22 21:58
    Worklog Time Spent: 10m 
      Work Description: phet commented on code in PR #3495:
URL: https://github.com/apache/gobblin/pull/3495#discussion_r860238661


##########
gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java:
##########
@@ -444,6 +445,103 @@ public void 
testProcessingRetriedForApparentlyTransientErrors() throws IOExcepti
     jobStatusMonitor.shutDown();
   }
 
+  @Test (dependsOnMethods = 
"testProcessingRetriedForApparentlyTransientErrors")
+  public void testProcessMessageForCancelledAndKilledEvent() throws 
IOException, ReflectiveOperationException {
+    KafkaEventReporter kafkaReporter = builder.build("localhost:0000", 
"topic4");
+
+    //Submit GobblinTrackingEvents to Kafka
+    ImmutableList.of(
+        createFlowCompiledEvent(),
+        createJobOrchestratedEvent(1),
+        createJobSLAKilledEvent(),
+        createJobOrchestratedEvent(1),
+        createJobStartSLAKilledEvent(),
+        // simulate retry again as maximum attempt is 2, but verify that kill 
event will not retry
+        createJobOrchestratedEvent(1),
+        createJobCancelledEvent()
+    ).forEach(event -> {
+      context.submitEvent(event);
+      kafkaReporter.report();
+    });
+
+    try {
+      Thread.sleep(1000);
+    } catch(InterruptedException ex) {
+      Thread.currentThread().interrupt();
+    }
+
+    Config config = 
ConfigFactory.empty().withValue(ConfigurationKeys.KAFKA_BROKERS, 
ConfigValueFactory.fromAnyRef("localhost:0000"))
+        
.withValue(Kafka09ConsumerClient.GOBBLIN_CONFIG_VALUE_DESERIALIZER_CLASS_KEY, 
ConfigValueFactory.fromAnyRef("org.apache.kafka.common.serialization.ByteArrayDeserializer"))
+        .withValue(ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY, 
ConfigValueFactory.fromAnyRef(stateStoreDir))
+        .withValue("zookeeper.connect", 
ConfigValueFactory.fromAnyRef("localhost:2121"));
+    MockKafkaAvroJobStatusMonitor jobStatusMonitor =  new 
MockKafkaAvroJobStatusMonitor("test",config, 1);
+    jobStatusMonitor.buildMetricsContextAndMetrics();
+    Iterator<DecodeableKafkaRecord> recordIterator = Iterators.transform(
+      this.kafkaTestHelper.getIteratorForTopic(TOPIC),
+      this::convertMessageAndMetadataToDecodableKafkaRecord);
+
+    jobStatusMonitor.processMessage(recordIterator.next());
+
+    StateStore stateStore = jobStatusMonitor.getStateStore();
+    String storeName = KafkaJobStatusMonitor.jobStatusStoreName(flowGroup, 
flowName);
+    String tableName = 
KafkaJobStatusMonitor.jobStatusTableName(this.flowExecutionId, "NA", "NA");
+    List<State> stateList  = stateStore.getAll(storeName, tableName);
+    Assert.assertEquals(stateList.size(), 1);
+    State state = stateList.get(0);
+    Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), 
ExecutionStatus.COMPILED.name());
+
+    jobStatusMonitor.processMessage(recordIterator.next());
+
+    tableName = KafkaJobStatusMonitor.jobStatusTableName(this.flowExecutionId, 
this.jobGroup, this.jobName);
+    stateList  = stateStore.getAll(storeName, tableName);
+    Assert.assertEquals(stateList.size(), 1);
+    state = stateList.get(0);
+    Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), 
ExecutionStatus.ORCHESTRATED.name());
+    jobStatusMonitor.processMessage(recordIterator.next());
+
+    stateList  = stateStore.getAll(storeName, tableName);
+    Assert.assertEquals(stateList.size(), 1);
+    state = stateList.get(0);
+    //Because the maximum attempt is set to 2, so the state is set to 
PENDING_RETRY after the first failure
+    Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), 
ExecutionStatus.PENDING_RETRY.name());
+    
Assert.assertEquals(state.getProp(TimingEvent.FlowEventConstants.SHOULD_RETRY_FIELD),
 Boolean.toString(true));
+
+    jobStatusMonitor.processMessage(recordIterator.next());
+
+    stateList  = stateStore.getAll(storeName, tableName);
+    Assert.assertEquals(stateList.size(), 1);
+    state = stateList.get(0);
+    //Job orchestrated for retrying
+    Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), 
ExecutionStatus.ORCHESTRATED.name());
+
+    jobStatusMonitor.processMessage(recordIterator.next());
+
+    stateList  = stateStore.getAll(storeName, tableName);
+    Assert.assertEquals(stateList.size(), 1);
+    state = stateList.get(0);
+    //Job orchestrated for retrying
+    Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), 
ExecutionStatus.PENDING_RETRY.name());
+
+    jobStatusMonitor.processMessage(recordIterator.next());
+
+    stateList  = stateStore.getAll(storeName, tableName);
+    Assert.assertEquals(stateList.size(), 1);
+    state = stateList.get(0);
+    //Job orchestrated for retrying
+    Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), 
ExecutionStatus.ORCHESTRATED.name());
+
+    jobStatusMonitor.processMessage(recordIterator.next());

Review Comment:
   I do see the difficulty of potentially async publication (is that why you 
`Thread.sleep(1000)`?) with the events.  that said, I have difficulty at this 
point recalling which message should be coming from each particular 
`recordIterator.next()`.
   
   there may not be a good approach here, but ones we could always consider 
would be to:
   a. publish one message at a time (and keep reading one by one from same 
`recordIterator`)
   b. write an abstraction to publish one message and return a `recordIterator` 
for it (or better still, a single element from the iterator)
   c. condense each set of assertions (following `stateList = 
stateStore.getAll(storeName, tableName)`) as a lambda, then defining each event 
in the same place near its corresponding assertion lambda.



##########
gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java:
##########
@@ -72,6 +72,8 @@ public static class FlowTimings {
     public static final String FLOW_FAILED = "FlowFailed";
     public static final String FLOW_RUNNING = "FlowRunning";
     public static final String FLOW_CANCELLED = "FlowCancelled";
+    public static final String FLOW_SLA_KILLED = "FlowSLAKilled";
+    public static final String FLOW_START_SLA_KILLED = "FlowStartSLAKilled";

Review Comment:
   \<meta>I'm having trouble separating what may be a personal pet peeve vs. 
overall clarity.\</meta>
   
   I find 'SLA' a confusing term for what is essentially a deadline.  therefore 
I'd suggest semantic names in our state space, such as:
   ```
   FLOW_RUN_DEADLINE_EXCEEDED
   FLOW_START_DEADLINE_EXCEEDED
   ```
   \<meta>still, I don't wish to insist this is semantically clearer for future 
maintainers (seems like users won't observe the state, correct?), should you 
find this merely personal preference on my part.\</meta>
   
   for background, a SLA is an enforceable expectation.  it should be monitored 
with deviations detected, but I am not familiar w/ any precedent for 
immediately terminating processing upon violation.



##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java:
##########
@@ -279,7 +278,8 @@ static void 
addJobStatusToStateStore(org.apache.gobblin.configuration.State jobS
     }
 
     modifyStateIfRetryRequired(jobStatus);
-
+    // Remove data not needed to be stored in state store
+    jobStatus.removeProp(TimingEvent.FlowEventConstants.IS_FLOW_SLA_KILLED);

Review Comment:
   could be a case for putting the `removeProp` within `modifyStateIf...`



##########
gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java:
##########
@@ -72,6 +72,8 @@ public static class FlowTimings {
     public static final String FLOW_FAILED = "FlowFailed";
     public static final String FLOW_RUNNING = "FlowRunning";
     public static final String FLOW_CANCELLED = "FlowCancelled";
+    public static final String FLOW_SLA_KILLED = "FlowSLAKilled";
+    public static final String FLOW_START_SLA_KILLED = "FlowStartSLAKilled";

Review Comment:
   and overall, I didn't initially see a way to define these semantics w/o 
adding new `ExecutionStatus` states, but that's what you did, and AFAICT it's a 
better approach!



##########
gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java:
##########
@@ -558,6 +656,63 @@ private GobblinTrackingEvent createJobFailedEvent() {
     GobblinTrackingEvent event = new GobblinTrackingEvent(timestamp, 
namespace, name, metadata);
     return event;
   }
+
+  private GobblinTrackingEvent createJobCancelledEvent() {
+    String namespace = "org.apache.gobblin.metrics";
+    Long timestamp = System.currentTimeMillis();
+    String name = TimingEvent.FlowTimings.FLOW_CANCELLED;
+    Map<String, String> metadata = Maps.newHashMap();
+    metadata.put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD, 
this.flowGroup);
+    metadata.put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD, 
this.flowName);
+    metadata.put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD, 
this.flowExecutionId);
+    metadata.put(TimingEvent.FlowEventConstants.JOB_NAME_FIELD, this.jobName);
+    metadata.put(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD, 
this.jobGroup);
+    metadata.put(TimingEvent.FlowEventConstants.JOB_EXECUTION_ID_FIELD, 
this.jobExecutionId);
+    metadata.put(TimingEvent.METADATA_MESSAGE, this.message);
+    metadata.put(TimingEvent.METADATA_START_TIME, "7");
+    metadata.put(TimingEvent.METADATA_END_TIME, "8");
+
+    GobblinTrackingEvent event = new GobblinTrackingEvent(timestamp, 
namespace, name, metadata);
+    return event;
+  }
+
+  private GobblinTrackingEvent createJobSLAKilledEvent() {
+    String namespace = "org.apache.gobblin.metrics";
+    Long timestamp = System.currentTimeMillis();
+    String name = TimingEvent.FlowTimings.FLOW_SLA_KILLED;
+    Map<String, String> metadata = Maps.newHashMap();
+    metadata.put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD, 
this.flowGroup);
+    metadata.put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD, 
this.flowName);
+    metadata.put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD, 
this.flowExecutionId);
+    metadata.put(TimingEvent.FlowEventConstants.JOB_NAME_FIELD, this.jobName);
+    metadata.put(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD, 
this.jobGroup);

Review Comment:
   seems a lot of boilerplate/duplication here.  could we capture within a 
common function that each of these could pass their specifics into?  e.g.
   ```
   GobblinTrackingEvent createGTE(String eventName, Map<String, String> 
customMetadata)
   ```



##########
gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java:
##########
@@ -444,6 +445,103 @@ public void 
testProcessingRetriedForApparentlyTransientErrors() throws IOExcepti
     jobStatusMonitor.shutDown();
   }
 
+  @Test (dependsOnMethods = 
"testProcessingRetriedForApparentlyTransientErrors")
+  public void testProcessMessageForCancelledAndKilledEvent() throws 
IOException, ReflectiveOperationException {
+    KafkaEventReporter kafkaReporter = builder.build("localhost:0000", 
"topic4");
+
+    //Submit GobblinTrackingEvents to Kafka
+    ImmutableList.of(
+        createFlowCompiledEvent(),
+        createJobOrchestratedEvent(1),
+        createJobSLAKilledEvent(),
+        createJobOrchestratedEvent(1),
+        createJobStartSLAKilledEvent(),
+        // simulate retry again as maximum attempt is 2, but verify that kill 
event will not retry
+        createJobOrchestratedEvent(1),
+        createJobCancelledEvent()
+    ).forEach(event -> {
+      context.submitEvent(event);
+      kafkaReporter.report();
+    });
+
+    try {
+      Thread.sleep(1000);
+    } catch(InterruptedException ex) {
+      Thread.currentThread().interrupt();
+    }
+
+    Config config = 
ConfigFactory.empty().withValue(ConfigurationKeys.KAFKA_BROKERS, 
ConfigValueFactory.fromAnyRef("localhost:0000"))
+        
.withValue(Kafka09ConsumerClient.GOBBLIN_CONFIG_VALUE_DESERIALIZER_CLASS_KEY, 
ConfigValueFactory.fromAnyRef("org.apache.kafka.common.serialization.ByteArrayDeserializer"))
+        .withValue(ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY, 
ConfigValueFactory.fromAnyRef(stateStoreDir))
+        .withValue("zookeeper.connect", 
ConfigValueFactory.fromAnyRef("localhost:2121"));
+    MockKafkaAvroJobStatusMonitor jobStatusMonitor =  new 
MockKafkaAvroJobStatusMonitor("test",config, 1);
+    jobStatusMonitor.buildMetricsContextAndMetrics();

Review Comment:
   could we encapsulate this?  e.g.
   ```MockKafkaAvroJobStatusMonitor createKAJSM(...)```





Issue Time Tracking
-------------------

    Worklog Id:     (was: 763170)
    Time Spent: 20m  (was: 10m)

> GaaS Flow SLA Kills should be retryable if configured
> -----------------------------------------------------
>
>                 Key: GOBBLIN-1634
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-1634
>             Project: Apache Gobblin
>          Issue Type: Task
>            Reporter: William Lo
>            Priority: Major
>          Time Spent: 20m
>  Remaining Estimate: 0h
>
> On Gobblin as a Service flows can fail SLAs if they do not receive a Kafka 
> event in some designated amount of time.
> Since GaaS supports retrys on failures, these failures due to SLAs should 
> also be retryable.
> However, if the flow is cancelled from a user specified event through the API 
> we do not want to retry.
> Additionally, we also do not want to retry if a flow is skipped due to 
> concurrent jobs running at the same time, as it is unlikely without a more 
> sophisticated waiting algorithm that the job will be finished by the time the 
> job is retried again, wasting resources.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to