[
https://issues.apache.org/jira/browse/GOBBLIN-2211?focusedWorklogId=975358&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-975358
]
ASF GitHub Bot logged work on GOBBLIN-2211:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 20/Jul/25 18:27
Start Date: 20/Jul/25 18:27
Worklog Time Spent: 10m
Work Description: abhishekmjain commented on code in PR #4121:
URL: https://github.com/apache/gobblin/pull/4121#discussion_r2217907400
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java:
##########
@@ -186,8 +190,15 @@ binding time (optionally bound classes cannot have names
associated with them),
if (serviceConfig.isJobStatusMonitorEnabled()) {
binder.bind(KafkaJobStatusMonitor.class).toProvider(KafkaJobStatusMonitorFactory.class).in(Singleton.class);
+ binder.bind(ErrorClassifier.class);
+ binder.bind(ErrorPatternStore.class)
+ .to(getClassByNameOrAlias(ErrorPatternStore.class,
serviceConfig.getInnerConfig(),
+ ServiceConfigKeys.ERROR_PATTERN_STORE_CLASS,
+ InMemoryErrorPatternStore.class.getName()));
+ binder.bind(ErrorPatternStore.class).to(MysqlErrorPatternStore.class);
}
-
+ binder.bind(MysqlErrorPatternStore.class);
+ binder.bind(InMemoryErrorPatternStore.class);
Review Comment:
are these needed if we bind `ErrorPatternStore.class` based on config on
line 194
##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java:
##########
@@ -234,8 +248,31 @@ protected void
processMessage(DecodeableKafkaRecord<byte[],byte[]> message) {
boolean retryRequired = modifyStateIfRetryRequired(jobStatus);
if (updatedJobStatus.getRight() == NewState.FINISHED &&
!retryRequired) {
- // do not send event if retry is required, because it can alert
users to re-submit a job that is already set to be retried by GaaS
- this.eventProducer.emitObservabilityEvent(jobStatus);
+ if (isErrorClassificationEnabled) {
+ long startTime = System.currentTimeMillis();
Review Comment:
startTime can be initialized in the below if block
##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java:
##########
@@ -234,8 +248,31 @@ protected void
processMessage(DecodeableKafkaRecord<byte[],byte[]> message) {
boolean retryRequired = modifyStateIfRetryRequired(jobStatus);
if (updatedJobStatus.getRight() == NewState.FINISHED &&
!retryRequired) {
- // do not send event if retry is required, because it can alert
users to re-submit a job that is already set to be retried by GaaS
- this.eventProducer.emitObservabilityEvent(jobStatus);
+ if (isErrorClassificationEnabled) {
+ long startTime = System.currentTimeMillis();
+ if
(jobStatus.getProp(JobStatusRetriever.EVENT_NAME_FIELD).equals(ExecutionStatus.FAILED.name()))
{
+ List<Issue> issues =
jobIssueEventHandler.getErrorListForClassification(
+
TroubleshooterUtils.getContextIdForJob(jobStatus.getProperties()));
+ long process_duration = System.currentTimeMillis() -
startTime;
+ log.info("Processing issues for job: {}, duration: {} ms",
jobStatus, flowName, flowExecutionId, jobName, process_duration);
Review Comment:
fix indentation
##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java:
##########
@@ -234,8 +248,31 @@ protected void
processMessage(DecodeableKafkaRecord<byte[],byte[]> message) {
boolean retryRequired = modifyStateIfRetryRequired(jobStatus);
if (updatedJobStatus.getRight() == NewState.FINISHED &&
!retryRequired) {
- // do not send event if retry is required, because it can alert
users to re-submit a job that is already set to be retried by GaaS
- this.eventProducer.emitObservabilityEvent(jobStatus);
+ if (isErrorClassificationEnabled) {
+ long startTime = System.currentTimeMillis();
+ if
(jobStatus.getProp(JobStatusRetriever.EVENT_NAME_FIELD).equals(ExecutionStatus.FAILED.name()))
{
+ List<Issue> issues =
jobIssueEventHandler.getErrorListForClassification(
+
TroubleshooterUtils.getContextIdForJob(jobStatus.getProperties()));
+ long process_duration = System.currentTimeMillis() -
startTime;
+ log.info("Processing issues for job: {}, duration: {} ms",
jobStatus, flowName, flowExecutionId, jobName, process_duration);
Review Comment:
log message has 2 {} whereas 5 parameters are being sent
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/IssueRepository.java:
##########
@@ -35,6 +35,9 @@ public interface IssueRepository {
List<Issue> getAll()
throws TroubleshooterException;
+ List<Issue> getAllTopRecentErrors(int limit)
Review Comment:
nit: can we call it `getMostRecentErrors` ?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java:
##########
@@ -234,8 +248,31 @@ protected void
processMessage(DecodeableKafkaRecord<byte[],byte[]> message) {
boolean retryRequired = modifyStateIfRetryRequired(jobStatus);
if (updatedJobStatus.getRight() == NewState.FINISHED &&
!retryRequired) {
- // do not send event if retry is required, because it can alert
users to re-submit a job that is already set to be retried by GaaS
- this.eventProducer.emitObservabilityEvent(jobStatus);
+ if (isErrorClassificationEnabled) {
+ long startTime = System.currentTimeMillis();
+ if
(jobStatus.getProp(JobStatusRetriever.EVENT_NAME_FIELD).equals(ExecutionStatus.FAILED.name()))
{
+ List<Issue> issues =
jobIssueEventHandler.getErrorListForClassification(
+
TroubleshooterUtils.getContextIdForJob(jobStatus.getProperties()));
+ long process_duration = System.currentTimeMillis() -
startTime;
+ log.info("Processing issues for job: {}, duration: {} ms",
jobStatus, flowName, flowExecutionId, jobName, process_duration);
+ try {
+ Issue finalIssue =
errorClassifier.classifyEarlyStopWithDefault(issues);
+ if (finalIssue != null) {
+ jobIssueEventHandler.LogFinalError(finalIssue, flowName,
flowGroup, String.valueOf(flowExecutionId),
+ jobName);
+ long final_duration = System.currentTimeMillis() -
startTime;
+ log.info("Classified issues for job: {}, duration: {} ms",
jobStatus,final_duration);
+ }
+ } catch (Exception e) {
+ log.error("Error classifying issues for job: {}", jobStatus,
e);
+ long final_duration = System.currentTimeMillis() - startTime;
+ log.info("Error classification for job: {}, duration: {}
ms", jobStatus, final_duration);
Review Comment:
Since jobStatus contains a lot of properties, let's skip adding it to logs.
We can use constituents of contextId for logging.
Also, if we want to log the final duration in all scenarios, let's put it
outside of try-catch block once instead of repeating it.
Issue Time Tracking
-------------------
Worklog Id: (was: 975358)
Time Spent: 50m (was: 40m)
> Implement Error Classification based on execution issues
> --------------------------------------------------------
>
> Key: GOBBLIN-2211
> URL: https://issues.apache.org/jira/browse/GOBBLIN-2211
> Project: Apache Gobblin
> Issue Type: Bug
> Components: gobblin-service
> Reporter: Abhishek Jain
> Assignee: Abhishek Tiwari
> Priority: Major
> Time Spent: 50m
> Remaining Estimate: 0h
>
> Implement Error Classification to categorize the failure reason based on
> issues encountered.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)