[ 
https://issues.apache.org/jira/browse/GOBBLIN-1634?focusedWorklogId=763203&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-763203
 ]

ASF GitHub Bot logged work on GOBBLIN-1634:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 28/Apr/22 00:18
            Start Date: 28/Apr/22 00:18
    Worklog Time Spent: 10m 
      Work Description: Will-Lo commented on code in PR #3495:
URL: https://github.com/apache/gobblin/pull/3495#discussion_r860338652


##########
gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java:
##########
@@ -444,6 +445,103 @@ public void 
testProcessingRetriedForApparentlyTransientErrors() throws IOExcepti
     jobStatusMonitor.shutDown();
   }
 
+  @Test (dependsOnMethods = 
"testProcessingRetriedForApparentlyTransientErrors")
+  public void testProcessMessageForCancelledAndKilledEvent() throws 
IOException, ReflectiveOperationException {
+    KafkaEventReporter kafkaReporter = builder.build("localhost:0000", 
"topic4");
+
+    //Submit GobblinTrackingEvents to Kafka
+    ImmutableList.of(
+        createFlowCompiledEvent(),
+        createJobOrchestratedEvent(1),
+        createJobSLAKilledEvent(),
+        createJobOrchestratedEvent(1),
+        createJobStartSLAKilledEvent(),
+        // simulate retry again as maximum attempt is 2, but verify that kill 
event will not retry
+        createJobOrchestratedEvent(1),
+        createJobCancelledEvent()
+    ).forEach(event -> {
+      context.submitEvent(event);
+      kafkaReporter.report();
+    });
+
+    try {
+      Thread.sleep(1000);
+    } catch(InterruptedException ex) {
+      Thread.currentThread().interrupt();
+    }
+
+    Config config = 
ConfigFactory.empty().withValue(ConfigurationKeys.KAFKA_BROKERS, 
ConfigValueFactory.fromAnyRef("localhost:0000"))
+        
.withValue(Kafka09ConsumerClient.GOBBLIN_CONFIG_VALUE_DESERIALIZER_CLASS_KEY, 
ConfigValueFactory.fromAnyRef("org.apache.kafka.common.serialization.ByteArrayDeserializer"))
+        .withValue(ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY, 
ConfigValueFactory.fromAnyRef(stateStoreDir))
+        .withValue("zookeeper.connect", 
ConfigValueFactory.fromAnyRef("localhost:2121"));
+    MockKafkaAvroJobStatusMonitor jobStatusMonitor =  new 
MockKafkaAvroJobStatusMonitor("test",config, 1);
+    jobStatusMonitor.buildMetricsContextAndMetrics();
+    Iterator<DecodeableKafkaRecord> recordIterator = Iterators.transform(
+      this.kafkaTestHelper.getIteratorForTopic(TOPIC),
+      this::convertMessageAndMetadataToDecodableKafkaRecord);
+
+    jobStatusMonitor.processMessage(recordIterator.next());
+
+    StateStore stateStore = jobStatusMonitor.getStateStore();
+    String storeName = KafkaJobStatusMonitor.jobStatusStoreName(flowGroup, 
flowName);
+    String tableName = 
KafkaJobStatusMonitor.jobStatusTableName(this.flowExecutionId, "NA", "NA");
+    List<State> stateList  = stateStore.getAll(storeName, tableName);
+    Assert.assertEquals(stateList.size(), 1);
+    State state = stateList.get(0);
+    Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), 
ExecutionStatus.COMPILED.name());
+
+    jobStatusMonitor.processMessage(recordIterator.next());
+
+    tableName = KafkaJobStatusMonitor.jobStatusTableName(this.flowExecutionId, 
this.jobGroup, this.jobName);
+    stateList  = stateStore.getAll(storeName, tableName);
+    Assert.assertEquals(stateList.size(), 1);
+    state = stateList.get(0);
+    Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), 
ExecutionStatus.ORCHESTRATED.name());
+    jobStatusMonitor.processMessage(recordIterator.next());
+
+    stateList  = stateStore.getAll(storeName, tableName);
+    Assert.assertEquals(stateList.size(), 1);
+    state = stateList.get(0);
+    //Because the maximum attempt is set to 2, so the state is set to 
PENDING_RETRY after the first failure
+    Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), 
ExecutionStatus.PENDING_RETRY.name());
+    
Assert.assertEquals(state.getProp(TimingEvent.FlowEventConstants.SHOULD_RETRY_FIELD),
 Boolean.toString(true));
+
+    jobStatusMonitor.processMessage(recordIterator.next());
+
+    stateList  = stateStore.getAll(storeName, tableName);
+    Assert.assertEquals(stateList.size(), 1);
+    state = stateList.get(0);
+    //Job orchestrated for retrying
+    Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), 
ExecutionStatus.ORCHESTRATED.name());
+
+    jobStatusMonitor.processMessage(recordIterator.next());
+
+    stateList  = stateStore.getAll(storeName, tableName);
+    Assert.assertEquals(stateList.size(), 1);
+    state = stateList.get(0);
+    //Job orchestrated for retrying
+    Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), 
ExecutionStatus.PENDING_RETRY.name());
+
+    jobStatusMonitor.processMessage(recordIterator.next());
+
+    stateList  = stateStore.getAll(storeName, tableName);
+    Assert.assertEquals(stateList.size(), 1);
+    state = stateList.get(0);
+    //Job orchestrated for retrying
+    Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), 
ExecutionStatus.ORCHESTRATED.name());
+
+    jobStatusMonitor.processMessage(recordIterator.next());

Review Comment:
   I think moreso than async publication it's delay between publication and 
reads, since the test suite does seem to actually push to a test kafka instance 
that is read directly.
   
   I think I can try something like the third option where I can map an event 
to an expected assertion (or set of assertions)





Issue Time Tracking
-------------------

    Worklog Id:     (was: 763203)
    Time Spent: 50m  (was: 40m)

> GaaS Flow SLA Kills should be retryable if configured
> -----------------------------------------------------
>
>                 Key: GOBBLIN-1634
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-1634
>             Project: Apache Gobblin
>          Issue Type: Task
>            Reporter: William Lo
>            Priority: Major
>          Time Spent: 50m
>  Remaining Estimate: 0h
>
> On Gobblin as a Service flows can fail SLAs if they do not receive a Kafka 
> event in some designated amount of time.
> Since GaaS supports retrys on failures, these failures due to SLAs should 
> also be retryable.
> However, if the flow is cancelled from a user specified event through the API 
> we do not want to retry.
> Additionally, we also do not want to retry if a flow is skipped due to 
> concurrent jobs running at the same time, as it is unlikely without a more 
> sophisticated waiting algorithm that the job will be finished by the time the 
> job is retried again, wasting resources.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to