phet commented on code in PR #3495:
URL: https://github.com/apache/gobblin/pull/3495#discussion_r860238661


##########
gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java:
##########
@@ -444,6 +445,103 @@ public void 
testProcessingRetriedForApparentlyTransientErrors() throws IOExcepti
     jobStatusMonitor.shutDown();
   }
 
+  @Test (dependsOnMethods = 
"testProcessingRetriedForApparentlyTransientErrors")
+  public void testProcessMessageForCancelledAndKilledEvent() throws 
IOException, ReflectiveOperationException {
+    KafkaEventReporter kafkaReporter = builder.build("localhost:0000", 
"topic4");
+
+    //Submit GobblinTrackingEvents to Kafka
+    ImmutableList.of(
+        createFlowCompiledEvent(),
+        createJobOrchestratedEvent(1),
+        createJobSLAKilledEvent(),
+        createJobOrchestratedEvent(1),
+        createJobStartSLAKilledEvent(),
+        // simulate retry again as maximum attempt is 2, but verify that kill 
event will not retry
+        createJobOrchestratedEvent(1),
+        createJobCancelledEvent()
+    ).forEach(event -> {
+      context.submitEvent(event);
+      kafkaReporter.report();
+    });
+
+    try {
+      Thread.sleep(1000);
+    } catch(InterruptedException ex) {
+      Thread.currentThread().interrupt();
+    }
+
+    Config config = 
ConfigFactory.empty().withValue(ConfigurationKeys.KAFKA_BROKERS, 
ConfigValueFactory.fromAnyRef("localhost:0000"))
+        
.withValue(Kafka09ConsumerClient.GOBBLIN_CONFIG_VALUE_DESERIALIZER_CLASS_KEY, 
ConfigValueFactory.fromAnyRef("org.apache.kafka.common.serialization.ByteArrayDeserializer"))
+        .withValue(ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY, 
ConfigValueFactory.fromAnyRef(stateStoreDir))
+        .withValue("zookeeper.connect", 
ConfigValueFactory.fromAnyRef("localhost:2121"));
+    MockKafkaAvroJobStatusMonitor jobStatusMonitor =  new 
MockKafkaAvroJobStatusMonitor("test",config, 1);
+    jobStatusMonitor.buildMetricsContextAndMetrics();
+    Iterator<DecodeableKafkaRecord> recordIterator = Iterators.transform(
+      this.kafkaTestHelper.getIteratorForTopic(TOPIC),
+      this::convertMessageAndMetadataToDecodableKafkaRecord);
+
+    jobStatusMonitor.processMessage(recordIterator.next());
+
+    StateStore stateStore = jobStatusMonitor.getStateStore();
+    String storeName = KafkaJobStatusMonitor.jobStatusStoreName(flowGroup, 
flowName);
+    String tableName = 
KafkaJobStatusMonitor.jobStatusTableName(this.flowExecutionId, "NA", "NA");
+    List<State> stateList  = stateStore.getAll(storeName, tableName);
+    Assert.assertEquals(stateList.size(), 1);
+    State state = stateList.get(0);
+    Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), 
ExecutionStatus.COMPILED.name());
+
+    jobStatusMonitor.processMessage(recordIterator.next());
+
+    tableName = KafkaJobStatusMonitor.jobStatusTableName(this.flowExecutionId, 
this.jobGroup, this.jobName);
+    stateList  = stateStore.getAll(storeName, tableName);
+    Assert.assertEquals(stateList.size(), 1);
+    state = stateList.get(0);
+    Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), 
ExecutionStatus.ORCHESTRATED.name());
+    jobStatusMonitor.processMessage(recordIterator.next());
+
+    stateList  = stateStore.getAll(storeName, tableName);
+    Assert.assertEquals(stateList.size(), 1);
+    state = stateList.get(0);
+    //Because the maximum attempt is set to 2, so the state is set to 
PENDING_RETRY after the first failure
+    Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), 
ExecutionStatus.PENDING_RETRY.name());
+    
Assert.assertEquals(state.getProp(TimingEvent.FlowEventConstants.SHOULD_RETRY_FIELD),
 Boolean.toString(true));
+
+    jobStatusMonitor.processMessage(recordIterator.next());
+
+    stateList  = stateStore.getAll(storeName, tableName);
+    Assert.assertEquals(stateList.size(), 1);
+    state = stateList.get(0);
+    //Job orchestrated for retrying
+    Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), 
ExecutionStatus.ORCHESTRATED.name());
+
+    jobStatusMonitor.processMessage(recordIterator.next());
+
+    stateList  = stateStore.getAll(storeName, tableName);
+    Assert.assertEquals(stateList.size(), 1);
+    state = stateList.get(0);
+    //Job orchestrated for retrying
+    Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), 
ExecutionStatus.PENDING_RETRY.name());
+
+    jobStatusMonitor.processMessage(recordIterator.next());
+
+    stateList  = stateStore.getAll(storeName, tableName);
+    Assert.assertEquals(stateList.size(), 1);
+    state = stateList.get(0);
+    //Job orchestrated for retrying
+    Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), 
ExecutionStatus.ORCHESTRATED.name());
+
+    jobStatusMonitor.processMessage(recordIterator.next());

Review Comment:
   I do see the difficulty of potentially async publication (is that why you 
`Thread.sleep(1000)`?) with the events.  that said, I have difficulty at this 
point recalling which message should be coming from each particular 
`recordIterator.next()`.
   
   there may not be a good approach here, but ones we could always consider 
would be to:
   a. publish one message at a time (and keep reading one by one from same 
`recordIterator`)
   b. write an abstraction to publish one message and return a `recordIterator` 
for it (or better still, a single element from the iterator)
   c. condense each set of assertions (following `stateList = 
stateStore.getAll(storeName, tableName)`) as a lambda, then defining each event 
in the same place near its corresponding assertion lambda.



##########
gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java:
##########
@@ -72,6 +72,8 @@ public static class FlowTimings {
     public static final String FLOW_FAILED = "FlowFailed";
     public static final String FLOW_RUNNING = "FlowRunning";
     public static final String FLOW_CANCELLED = "FlowCancelled";
+    public static final String FLOW_SLA_KILLED = "FlowSLAKilled";
+    public static final String FLOW_START_SLA_KILLED = "FlowStartSLAKilled";

Review Comment:
   \<meta>I'm having trouble separating what may be a personal pet peeve vs. 
overall clarity.\</meta>
   
   I find 'SLA' a confusing term for what is essentially a deadline.  therefore 
I'd suggest semantic names in our state space, such as:
   ```
   FLOW_RUN_DEADLINE_EXCEEDED
   FLOW_START_DEADLINE_EXCEEDED
   ```
   \<meta>still, I don't wish to insist this is semantically clearer for future 
maintainers (seems like users won't observe the state, correct?), should you 
find this merely personal preference on my part.\</meta>
   
   for background, a SLA is an enforceable expectation.  it should be monitored 
with deviations detected, but I am not familiar w/ any precedent for 
immediately terminating processing upon violation.



##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java:
##########
@@ -279,7 +278,8 @@ static void 
addJobStatusToStateStore(org.apache.gobblin.configuration.State jobS
     }
 
     modifyStateIfRetryRequired(jobStatus);
-
+    // Remove data not needed to be stored in state store
+    jobStatus.removeProp(TimingEvent.FlowEventConstants.IS_FLOW_SLA_KILLED);

Review Comment:
   could be a case for putting the `removeProp` within `modifyStateIf...`



##########
gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java:
##########
@@ -72,6 +72,8 @@ public static class FlowTimings {
     public static final String FLOW_FAILED = "FlowFailed";
     public static final String FLOW_RUNNING = "FlowRunning";
     public static final String FLOW_CANCELLED = "FlowCancelled";
+    public static final String FLOW_SLA_KILLED = "FlowSLAKilled";
+    public static final String FLOW_START_SLA_KILLED = "FlowStartSLAKilled";

Review Comment:
   and overall, I didn't initially see a way to define these semantics w/o 
adding new `ExecutionStatus` states, but that's what you did, and AFAICT it's a 
better approach!



##########
gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java:
##########
@@ -558,6 +656,63 @@ private GobblinTrackingEvent createJobFailedEvent() {
     GobblinTrackingEvent event = new GobblinTrackingEvent(timestamp, 
namespace, name, metadata);
     return event;
   }
+
+  private GobblinTrackingEvent createJobCancelledEvent() {
+    String namespace = "org.apache.gobblin.metrics";
+    Long timestamp = System.currentTimeMillis();
+    String name = TimingEvent.FlowTimings.FLOW_CANCELLED;
+    Map<String, String> metadata = Maps.newHashMap();
+    metadata.put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD, 
this.flowGroup);
+    metadata.put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD, 
this.flowName);
+    metadata.put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD, 
this.flowExecutionId);
+    metadata.put(TimingEvent.FlowEventConstants.JOB_NAME_FIELD, this.jobName);
+    metadata.put(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD, 
this.jobGroup);
+    metadata.put(TimingEvent.FlowEventConstants.JOB_EXECUTION_ID_FIELD, 
this.jobExecutionId);
+    metadata.put(TimingEvent.METADATA_MESSAGE, this.message);
+    metadata.put(TimingEvent.METADATA_START_TIME, "7");
+    metadata.put(TimingEvent.METADATA_END_TIME, "8");
+
+    GobblinTrackingEvent event = new GobblinTrackingEvent(timestamp, 
namespace, name, metadata);
+    return event;
+  }
+
+  private GobblinTrackingEvent createJobSLAKilledEvent() {
+    String namespace = "org.apache.gobblin.metrics";
+    Long timestamp = System.currentTimeMillis();
+    String name = TimingEvent.FlowTimings.FLOW_SLA_KILLED;
+    Map<String, String> metadata = Maps.newHashMap();
+    metadata.put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD, 
this.flowGroup);
+    metadata.put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD, 
this.flowName);
+    metadata.put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD, 
this.flowExecutionId);
+    metadata.put(TimingEvent.FlowEventConstants.JOB_NAME_FIELD, this.jobName);
+    metadata.put(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD, 
this.jobGroup);

Review Comment:
   seems a lot of boilerplate/duplication here.  could we capture within a 
common function that each of these could pass their specifics into?  e.g.
   ```
   GobblinTrackingEvent createGTE(String eventName, Map<String, String> 
customMetadata)
   ```



##########
gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java:
##########
@@ -444,6 +445,103 @@ public void 
testProcessingRetriedForApparentlyTransientErrors() throws IOExcepti
     jobStatusMonitor.shutDown();
   }
 
+  @Test (dependsOnMethods = 
"testProcessingRetriedForApparentlyTransientErrors")
+  public void testProcessMessageForCancelledAndKilledEvent() throws 
IOException, ReflectiveOperationException {
+    KafkaEventReporter kafkaReporter = builder.build("localhost:0000", 
"topic4");
+
+    //Submit GobblinTrackingEvents to Kafka
+    ImmutableList.of(
+        createFlowCompiledEvent(),
+        createJobOrchestratedEvent(1),
+        createJobSLAKilledEvent(),
+        createJobOrchestratedEvent(1),
+        createJobStartSLAKilledEvent(),
+        // simulate retry again as maximum attempt is 2, but verify that kill 
event will not retry
+        createJobOrchestratedEvent(1),
+        createJobCancelledEvent()
+    ).forEach(event -> {
+      context.submitEvent(event);
+      kafkaReporter.report();
+    });
+
+    try {
+      Thread.sleep(1000);
+    } catch(InterruptedException ex) {
+      Thread.currentThread().interrupt();
+    }
+
+    Config config = 
ConfigFactory.empty().withValue(ConfigurationKeys.KAFKA_BROKERS, 
ConfigValueFactory.fromAnyRef("localhost:0000"))
+        
.withValue(Kafka09ConsumerClient.GOBBLIN_CONFIG_VALUE_DESERIALIZER_CLASS_KEY, 
ConfigValueFactory.fromAnyRef("org.apache.kafka.common.serialization.ByteArrayDeserializer"))
+        .withValue(ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY, 
ConfigValueFactory.fromAnyRef(stateStoreDir))
+        .withValue("zookeeper.connect", 
ConfigValueFactory.fromAnyRef("localhost:2121"));
+    MockKafkaAvroJobStatusMonitor jobStatusMonitor =  new 
MockKafkaAvroJobStatusMonitor("test",config, 1);
+    jobStatusMonitor.buildMetricsContextAndMetrics();

Review Comment:
   could we encapsulate this?  e.g.
   ```MockKafkaAvroJobStatusMonitor createKAJSM(...)```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to