This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new 5b881882f28 branch-4.0: [fix](streaming) Fix NPE in StreamingInsertJob 
when MetricRepo is not initialized during replay #61253 (#61295)
5b881882f28 is described below

commit 5b881882f28d266795842a729f0910e0a2ebc596
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Fri Mar 13 16:38:26 2026 +0800

    branch-4.0: [fix](streaming) Fix NPE in StreamingInsertJob when MetricRepo 
is not initialized during replay #61253 (#61295)
    
    Cherry-picked from #61253
    
    Co-authored-by: wudi <[email protected]>
---
 .../insert/streaming/StreamingInsertJob.java       | 51 ++++++++++++++--------
 1 file changed, 33 insertions(+), 18 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
index c820b8d532a..0b0faa0f64f 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
@@ -540,12 +540,16 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
                 // and auto resume will automatically wake it up.
                 this.updateJobStatus(JobStatus.PAUSED);
 
-                
MetricRepo.COUNTER_STREAMING_JOB_GET_META_FAIL_COUNT.increase(1L);
+                if (MetricRepo.isInit) {
+                    
MetricRepo.COUNTER_STREAMING_JOB_GET_META_FAIL_COUNT.increase(1L);
+                }
             }
         } finally {
             long end = System.currentTimeMillis();
-            MetricRepo.COUNTER_STREAMING_JOB_GET_META_LANTENCY.increase(end - 
start);
-            MetricRepo.COUNTER_STREAMING_JOB_GET_META_COUNT.increase(1L);
+            if (MetricRepo.isInit) {
+                
MetricRepo.COUNTER_STREAMING_JOB_GET_META_LANTENCY.increase(end - start);
+                MetricRepo.COUNTER_STREAMING_JOB_GET_META_COUNT.increase(1L);
+            }
         }
     }
 
@@ -592,7 +596,9 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
             failedTaskCount.incrementAndGet();
             
Env.getCurrentEnv().getJobManager().getStreamingTaskManager().removeRunningTask(task);
             this.failureReason = new FailureReason(task.getErrMsg());
-            MetricRepo.COUNTER_STREAMING_JOB_TASK_FAILED_COUNT.increase(1L);
+            if (MetricRepo.isInit) {
+                
MetricRepo.COUNTER_STREAMING_JOB_TASK_FAILED_COUNT.increase(1L);
+            }
         } finally {
             writeUnlock();
         }
@@ -604,8 +610,11 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
             resetFailureInfo(null);
             succeedTaskCount.incrementAndGet();
             //update metric
-            MetricRepo.COUNTER_STREAMING_JOB_TASK_EXECUTE_COUNT.increase(1L);
-            
MetricRepo.COUNTER_STREAMING_JOB_TASK_EXECUTE_TIME.increase(task.getFinishTimeMs()
 - task.getStartTimeMs());
+            if (MetricRepo.isInit) {
+                
MetricRepo.COUNTER_STREAMING_JOB_TASK_EXECUTE_COUNT.increase(1L);
+                MetricRepo.COUNTER_STREAMING_JOB_TASK_EXECUTE_TIME.increase(
+                        task.getFinishTimeMs() - task.getStartTimeMs());
+            }
 
             
Env.getCurrentEnv().getJobManager().getStreamingTaskManager().removeRunningTask(task);
             AbstractStreamingTask nextTask = createStreamingTask();
@@ -617,7 +626,7 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
         }
     }
 
-    private void updateJobStatisticAndOffset(StreamingTaskTxnCommitAttachment 
attachment) {
+    private void updateJobStatisticAndOffset(StreamingTaskTxnCommitAttachment 
attachment, boolean isReplay) {
         if (this.jobStatistic == null) {
             this.jobStatistic = new StreamingJobStatistic();
         }
@@ -628,11 +637,13 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
         
offsetProvider.updateOffset(offsetProvider.deserializeOffset(attachment.getOffset()));
 
         //update metric
-        
MetricRepo.COUNTER_STREAMING_JOB_TOTAL_ROWS.increase(attachment.getScannedRows());
-        
MetricRepo.COUNTER_STREAMING_JOB_LOAD_BYTES.increase(attachment.getLoadBytes());
+        if (MetricRepo.isInit && !isReplay) {
+            
MetricRepo.COUNTER_STREAMING_JOB_TOTAL_ROWS.increase(attachment.getScannedRows());
+            
MetricRepo.COUNTER_STREAMING_JOB_LOAD_BYTES.increase(attachment.getLoadBytes());
+        }
     }
 
-    private void 
updateCloudJobStatisticAndOffset(StreamingTaskTxnCommitAttachment attachment) {
+    private void 
updateCloudJobStatisticAndOffset(StreamingTaskTxnCommitAttachment attachment, 
boolean isReplay) {
         if (this.jobStatistic == null) {
             this.jobStatistic = new StreamingJobStatistic();
         }
@@ -643,8 +654,10 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
         
offsetProvider.updateOffset(offsetProvider.deserializeOffset(attachment.getOffset()));
 
         //update metric
-        
MetricRepo.COUNTER_STREAMING_JOB_TOTAL_ROWS.update(attachment.getScannedRows());
-        
MetricRepo.COUNTER_STREAMING_JOB_LOAD_BYTES.update(attachment.getLoadBytes());
+        if (MetricRepo.isInit && !isReplay) {
+            
MetricRepo.COUNTER_STREAMING_JOB_TOTAL_ROWS.update(attachment.getScannedRows());
+            
MetricRepo.COUNTER_STREAMING_JOB_LOAD_BYTES.update(attachment.getLoadBytes());
+        }
     }
 
     private void updateJobStatisticAndOffset(CommitOffsetRequest 
offsetRequest) {
@@ -668,9 +681,11 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
         
offsetProvider.updateOffset(offsetProvider.deserializeOffset(offsetRequest.getOffset()));
 
         //update metric
-        
MetricRepo.COUNTER_STREAMING_JOB_TOTAL_ROWS.increase(offsetRequest.getScannedRows());
-        
MetricRepo.COUNTER_STREAMING_JOB_FILTER_ROWS.increase(offsetRequest.getFilteredRows());
-        
MetricRepo.COUNTER_STREAMING_JOB_LOAD_BYTES.increase(offsetRequest.getLoadBytes());
+        if (MetricRepo.isInit) {
+            
MetricRepo.COUNTER_STREAMING_JOB_TOTAL_ROWS.increase(offsetRequest.getScannedRows());
+            
MetricRepo.COUNTER_STREAMING_JOB_FILTER_ROWS.increase(offsetRequest.getFilteredRows());
+            
MetricRepo.COUNTER_STREAMING_JOB_LOAD_BYTES.increase(offsetRequest.getLoadBytes());
+        }
     }
 
     @Override
@@ -1006,7 +1021,7 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
         Preconditions.checkNotNull(txnState.getTxnCommitAttachment(), 
txnState);
         StreamingTaskTxnCommitAttachment attachment =
                 (StreamingTaskTxnCommitAttachment) 
txnState.getTxnCommitAttachment();
-        updateJobStatisticAndOffset(attachment);
+        updateJobStatisticAndOffset(attachment, false);
     }
 
     @Override
@@ -1014,7 +1029,7 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
         Preconditions.checkNotNull(txnState.getTxnCommitAttachment(), 
txnState);
         StreamingTaskTxnCommitAttachment attachment =
                 (StreamingTaskTxnCommitAttachment) 
txnState.getTxnCommitAttachment();
-        updateJobStatisticAndOffset(attachment);
+        updateJobStatisticAndOffset(attachment, true);
         succeedTaskCount.incrementAndGet();
     }
 
@@ -1058,7 +1073,7 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
 
         StreamingTaskTxnCommitAttachment commitAttach =
                 new 
StreamingTaskTxnCommitAttachment(response.getCommitAttach());
-        updateCloudJobStatisticAndOffset(commitAttach);
+        updateCloudJobStatisticAndOffset(commitAttach, true);
     }
 
     @Override


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to