This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 382664fe551 [fix](streaming) Fix NPE in StreamingInsertJob when
MetricRepo is not initialized during replay (#61253)
382664fe551 is described below
commit 382664fe551ac89fb2e21d2e42ec1e2fc5ad6345
Author: wudi <[email protected]>
AuthorDate: Fri Mar 13 10:55:44 2026 +0800
[fix](streaming) Fix NPE in StreamingInsertJob when MetricRepo is not
initialized during replay (#61253)
### What problem does this PR solve?
#### Problem
`StreamingInsertJob.replayOnCommitted()` throws a `NullPointerException`
during FE replay:
```
java.lang.NullPointerException: Cannot invoke
"org.apache.doris.metric.LongCounterMetric.increase(java.lang.Long)"
because
"org.apache.doris.metric.MetricRepo.COUNTER_STREAMING_JOB_TOTAL_ROWS" is null
at
StreamingInsertJob.updateJobStatisticAndOffset(StreamingInsertJob.java:634)
at StreamingInsertJob.replayOnCommitted(StreamingInsertJob.java:1020)
at
TransactionState.replaySetTransactionStatus(TransactionState.java:589)
at
DatabaseTransactionMgr.replayUpsertTransactionState(DatabaseTransactionMgr.java:2636)
at
GlobalTransactionMgr.replayUpsertTransactionState(GlobalTransactionMgr.java:952)
```
The root cause is that `MetricRepo` may not be initialized when FE
replays transaction logs during startup,
but `StreamingInsertJob` unconditionally calls metric update methods,
leading to NPE.
#### Fix
Two separate changes are applied to `StreamingInsertJob`:
1. **Skip metric updates during replay**: `updateJobStatisticAndOffset`
and
`updateCloudJobStatisticAndOffset` now accept an `isReplay` boolean
parameter. Call sites in
`replayOnCommitted` and `replayOnCloudMode` pass `true`, while
`afterCommitted` passes `false`.
2. **Guard all metric calls with `MetricRepo.isInit`**: All remaining
`MetricRepo.COUNTER_STREAMING_JOB_*`
call sites are wrapped with `if (MetricRepo.isInit && !isReplay)` or `if
(MetricRepo.isInit)` to prevent NPE
if `MetricRepo` has not been fully initialized.
---
.../insert/streaming/StreamingInsertJob.java | 51 ++++++++++++++--------
1 file changed, 33 insertions(+), 18 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
index e8d168eaa43..c801eb16ddb 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
@@ -543,12 +543,16 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
// and auto resume will automatically wake it up.
this.updateJobStatus(JobStatus.PAUSED);
-
MetricRepo.COUNTER_STREAMING_JOB_GET_META_FAIL_COUNT.increase(1L);
+ if (MetricRepo.isInit) {
+
MetricRepo.COUNTER_STREAMING_JOB_GET_META_FAIL_COUNT.increase(1L);
+ }
}
} finally {
long end = System.currentTimeMillis();
- MetricRepo.COUNTER_STREAMING_JOB_GET_META_LANTENCY.increase(end -
start);
- MetricRepo.COUNTER_STREAMING_JOB_GET_META_COUNT.increase(1L);
+ if (MetricRepo.isInit) {
+
MetricRepo.COUNTER_STREAMING_JOB_GET_META_LANTENCY.increase(end - start);
+ MetricRepo.COUNTER_STREAMING_JOB_GET_META_COUNT.increase(1L);
+ }
}
}
@@ -595,7 +599,9 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
failedTaskCount.incrementAndGet();
Env.getCurrentEnv().getJobManager().getStreamingTaskManager().removeRunningTask(task);
this.failureReason = new FailureReason(task.getErrMsg());
- MetricRepo.COUNTER_STREAMING_JOB_TASK_FAILED_COUNT.increase(1L);
+ if (MetricRepo.isInit) {
+
MetricRepo.COUNTER_STREAMING_JOB_TASK_FAILED_COUNT.increase(1L);
+ }
} finally {
writeUnlock();
}
@@ -607,8 +613,11 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
resetFailureInfo(null);
succeedTaskCount.incrementAndGet();
//update metric
- MetricRepo.COUNTER_STREAMING_JOB_TASK_EXECUTE_COUNT.increase(1L);
-
MetricRepo.COUNTER_STREAMING_JOB_TASK_EXECUTE_TIME.increase(task.getFinishTimeMs()
- task.getStartTimeMs());
+ if (MetricRepo.isInit) {
+
MetricRepo.COUNTER_STREAMING_JOB_TASK_EXECUTE_COUNT.increase(1L);
+ MetricRepo.COUNTER_STREAMING_JOB_TASK_EXECUTE_TIME.increase(
+ task.getFinishTimeMs() - task.getStartTimeMs());
+ }
Env.getCurrentEnv().getJobManager().getStreamingTaskManager().removeRunningTask(task);
AbstractStreamingTask nextTask = createStreamingTask();
@@ -620,7 +629,7 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
}
}
- private void updateJobStatisticAndOffset(StreamingTaskTxnCommitAttachment
attachment) {
+ private void updateJobStatisticAndOffset(StreamingTaskTxnCommitAttachment
attachment, boolean isReplay) {
if (this.jobStatistic == null) {
this.jobStatistic = new StreamingJobStatistic();
}
@@ -631,11 +640,13 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
offsetProvider.updateOffset(offsetProvider.deserializeOffset(attachment.getOffset()));
//update metric
-
MetricRepo.COUNTER_STREAMING_JOB_TOTAL_ROWS.increase(attachment.getScannedRows());
-
MetricRepo.COUNTER_STREAMING_JOB_LOAD_BYTES.increase(attachment.getLoadBytes());
+ if (MetricRepo.isInit && !isReplay) {
+
MetricRepo.COUNTER_STREAMING_JOB_TOTAL_ROWS.increase(attachment.getScannedRows());
+
MetricRepo.COUNTER_STREAMING_JOB_LOAD_BYTES.increase(attachment.getLoadBytes());
+ }
}
- private void
updateCloudJobStatisticAndOffset(StreamingTaskTxnCommitAttachment attachment) {
+ private void
updateCloudJobStatisticAndOffset(StreamingTaskTxnCommitAttachment attachment,
boolean isReplay) {
if (this.jobStatistic == null) {
this.jobStatistic = new StreamingJobStatistic();
}
@@ -646,8 +657,10 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
offsetProvider.updateOffset(offsetProvider.deserializeOffset(attachment.getOffset()));
//update metric
-
MetricRepo.COUNTER_STREAMING_JOB_TOTAL_ROWS.update(attachment.getScannedRows());
-
MetricRepo.COUNTER_STREAMING_JOB_LOAD_BYTES.update(attachment.getLoadBytes());
+ if (MetricRepo.isInit && !isReplay) {
+
MetricRepo.COUNTER_STREAMING_JOB_TOTAL_ROWS.update(attachment.getScannedRows());
+
MetricRepo.COUNTER_STREAMING_JOB_LOAD_BYTES.update(attachment.getLoadBytes());
+ }
}
private void updateJobStatisticAndOffset(CommitOffsetRequest
offsetRequest) {
@@ -671,9 +684,11 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
offsetProvider.updateOffset(offsetProvider.deserializeOffset(offsetRequest.getOffset()));
//update metric
-
MetricRepo.COUNTER_STREAMING_JOB_TOTAL_ROWS.increase(offsetRequest.getScannedRows());
-
MetricRepo.COUNTER_STREAMING_JOB_FILTER_ROWS.increase(offsetRequest.getFilteredRows());
-
MetricRepo.COUNTER_STREAMING_JOB_LOAD_BYTES.increase(offsetRequest.getLoadBytes());
+ if (MetricRepo.isInit) {
+
MetricRepo.COUNTER_STREAMING_JOB_TOTAL_ROWS.increase(offsetRequest.getScannedRows());
+
MetricRepo.COUNTER_STREAMING_JOB_FILTER_ROWS.increase(offsetRequest.getFilteredRows());
+
MetricRepo.COUNTER_STREAMING_JOB_LOAD_BYTES.increase(offsetRequest.getLoadBytes());
+ }
}
@Override
@@ -1009,7 +1024,7 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
Preconditions.checkNotNull(txnState.getTxnCommitAttachment(),
txnState);
StreamingTaskTxnCommitAttachment attachment =
(StreamingTaskTxnCommitAttachment)
txnState.getTxnCommitAttachment();
- updateJobStatisticAndOffset(attachment);
+ updateJobStatisticAndOffset(attachment, false);
}
@Override
@@ -1017,7 +1032,7 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
Preconditions.checkNotNull(txnState.getTxnCommitAttachment(),
txnState);
StreamingTaskTxnCommitAttachment attachment =
(StreamingTaskTxnCommitAttachment)
txnState.getTxnCommitAttachment();
- updateJobStatisticAndOffset(attachment);
+ updateJobStatisticAndOffset(attachment, true);
succeedTaskCount.incrementAndGet();
}
@@ -1061,7 +1076,7 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
StreamingTaskTxnCommitAttachment commitAttach =
new
StreamingTaskTxnCommitAttachment(response.getCommitAttach());
- updateCloudJobStatisticAndOffset(commitAttach);
+ updateCloudJobStatisticAndOffset(commitAttach, true);
}
@Override
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]