[ 
https://issues.apache.org/jira/browse/GOBBLIN-2211?focusedWorklogId=975358&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-975358
 ]

ASF GitHub Bot logged work on GOBBLIN-2211:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 20/Jul/25 18:27
            Start Date: 20/Jul/25 18:27
    Worklog Time Spent: 10m 
      Work Description: abhishekmjain commented on code in PR #4121:
URL: https://github.com/apache/gobblin/pull/4121#discussion_r2217907400


##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java:
##########
@@ -186,8 +190,15 @@ binding time (optionally bound classes cannot have names 
associated with them),
 
     if (serviceConfig.isJobStatusMonitorEnabled()) {
       
binder.bind(KafkaJobStatusMonitor.class).toProvider(KafkaJobStatusMonitorFactory.class).in(Singleton.class);
+      binder.bind(ErrorClassifier.class);
+      binder.bind(ErrorPatternStore.class)
+          .to(getClassByNameOrAlias(ErrorPatternStore.class, 
serviceConfig.getInnerConfig(),
+              ServiceConfigKeys.ERROR_PATTERN_STORE_CLASS,
+              InMemoryErrorPatternStore.class.getName()));
+      binder.bind(ErrorPatternStore.class).to(MysqlErrorPatternStore.class);
     }
-
+    binder.bind(MysqlErrorPatternStore.class);
+    binder.bind(InMemoryErrorPatternStore.class);

Review Comment:
   are these needed if we bind `ErrorPatternStore.class` based on config on 
line 194



##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java:
##########
@@ -234,8 +248,31 @@ protected void 
processMessage(DecodeableKafkaRecord<byte[],byte[]> message) {
           boolean retryRequired = modifyStateIfRetryRequired(jobStatus);
 
           if (updatedJobStatus.getRight() == NewState.FINISHED && 
!retryRequired) {
-            // do not send event if retry is required, because it can alert 
users to re-submit a job that is already set to be retried by GaaS
-            this.eventProducer.emitObservabilityEvent(jobStatus);
+            if (isErrorClassificationEnabled) {
+              long startTime = System.currentTimeMillis();

Review Comment:
   startTime can be initialized in the below if block



##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java:
##########
@@ -234,8 +248,31 @@ protected void 
processMessage(DecodeableKafkaRecord<byte[],byte[]> message) {
           boolean retryRequired = modifyStateIfRetryRequired(jobStatus);
 
           if (updatedJobStatus.getRight() == NewState.FINISHED && 
!retryRequired) {
-            // do not send event if retry is required, because it can alert 
users to re-submit a job that is already set to be retried by GaaS
-            this.eventProducer.emitObservabilityEvent(jobStatus);
+            if (isErrorClassificationEnabled) {
+              long startTime = System.currentTimeMillis();
+              if 
(jobStatus.getProp(JobStatusRetriever.EVENT_NAME_FIELD).equals(ExecutionStatus.FAILED.name()))
 {
+                List<Issue> issues = 
jobIssueEventHandler.getErrorListForClassification(
+                    
TroubleshooterUtils.getContextIdForJob(jobStatus.getProperties()));
+                    long process_duration = System.currentTimeMillis() - 
startTime;
+                    log.info("Processing issues for job: {}, duration: {} ms", 
jobStatus, flowName, flowExecutionId, jobName, process_duration);

Review Comment:
   fix indentation



##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java:
##########
@@ -234,8 +248,31 @@ protected void 
processMessage(DecodeableKafkaRecord<byte[],byte[]> message) {
           boolean retryRequired = modifyStateIfRetryRequired(jobStatus);
 
           if (updatedJobStatus.getRight() == NewState.FINISHED && 
!retryRequired) {
-            // do not send event if retry is required, because it can alert 
users to re-submit a job that is already set to be retried by GaaS
-            this.eventProducer.emitObservabilityEvent(jobStatus);
+            if (isErrorClassificationEnabled) {
+              long startTime = System.currentTimeMillis();
+              if 
(jobStatus.getProp(JobStatusRetriever.EVENT_NAME_FIELD).equals(ExecutionStatus.FAILED.name()))
 {
+                List<Issue> issues = 
jobIssueEventHandler.getErrorListForClassification(
+                    
TroubleshooterUtils.getContextIdForJob(jobStatus.getProperties()));
+                    long process_duration = System.currentTimeMillis() - 
startTime;
+                    log.info("Processing issues for job: {}, duration: {} ms", 
jobStatus, flowName, flowExecutionId, jobName, process_duration);

Review Comment:
   log message has 2 {} whereas 5 parameters are being sent



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/IssueRepository.java:
##########
@@ -35,6 +35,9 @@ public interface IssueRepository {
   List<Issue> getAll()
       throws TroubleshooterException;
 
+  List<Issue> getAllTopRecentErrors(int limit)

Review Comment:
   nit: can we call it `getMostRecentErrors` ?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java:
##########
@@ -234,8 +248,31 @@ protected void 
processMessage(DecodeableKafkaRecord<byte[],byte[]> message) {
           boolean retryRequired = modifyStateIfRetryRequired(jobStatus);
 
           if (updatedJobStatus.getRight() == NewState.FINISHED && 
!retryRequired) {
-            // do not send event if retry is required, because it can alert 
users to re-submit a job that is already set to be retried by GaaS
-            this.eventProducer.emitObservabilityEvent(jobStatus);
+            if (isErrorClassificationEnabled) {
+              long startTime = System.currentTimeMillis();
+              if 
(jobStatus.getProp(JobStatusRetriever.EVENT_NAME_FIELD).equals(ExecutionStatus.FAILED.name()))
 {
+                List<Issue> issues = 
jobIssueEventHandler.getErrorListForClassification(
+                    
TroubleshooterUtils.getContextIdForJob(jobStatus.getProperties()));
+                    long process_duration = System.currentTimeMillis() - 
startTime;
+                    log.info("Processing issues for job: {}, duration: {} ms", 
jobStatus, flowName, flowExecutionId, jobName, process_duration);
+                try {
+                  Issue finalIssue = 
errorClassifier.classifyEarlyStopWithDefault(issues);
+                  if (finalIssue != null) {
+                    jobIssueEventHandler.LogFinalError(finalIssue, flowName, 
flowGroup, String.valueOf(flowExecutionId),
+                        jobName);
+                    long final_duration = System.currentTimeMillis() - 
startTime;
+                    log.info("Classified issues for job: {}, duration: {} ms", 
jobStatus,final_duration);
+                  }
+                } catch (Exception e) {
+                  log.error("Error classifying issues for job: {}", jobStatus, 
e);
+                  long final_duration = System.currentTimeMillis() - startTime;
+                  log.info("Error classification for job: {}, duration: {} 
ms", jobStatus, final_duration);

Review Comment:
   Since jobStatus contains a lot of properties, let's skip adding it to logs.
   We can use constituents of contextId for logging.
   
   Also, if we want to log the final duration in all scenarios, let's put it 
outside of try-catch block once instead of repeating it.





Issue Time Tracking
-------------------

    Worklog Id:     (was: 975358)
    Time Spent: 50m  (was: 40m)

> Implement Error Classification based on execution issues
> --------------------------------------------------------
>
>                 Key: GOBBLIN-2211
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-2211
>             Project: Apache Gobblin
>          Issue Type: Bug
>          Components: gobblin-service
>            Reporter: Abhishek Jain
>            Assignee: Abhishek Tiwari
>            Priority: Major
>          Time Spent: 50m
>  Remaining Estimate: 0h
>
> Implement Error Classification to categorize the failure reason based on 
> issues encountered.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to