Copilot commented on code in PR #64566:
URL: https://github.com/apache/doris/pull/64566#discussion_r3419712735
##########
fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java:
##########
@@ -737,8 +739,20 @@ public void advanceSplitsIfNeed() throws JobException {
if (offsetProvider.noMoreSplits()) {
return;
}
+ // Admit as many splits as possible this tick, up to
MAX_PENDING_SPLITS backlog, then yield
+ // before the next tick (deadline) which resumes the rest.
+ long intervalMs = jobProperties.getMaxIntervalSecond() * 1000L;
+ long deadline = System.currentTimeMillis() + intervalMs -
Math.min(1000L, intervalMs / 10);
try {
- offsetProvider.advanceSplits();
+ while (!offsetProvider.noMoreSplits()
+ && offsetProvider.pendingSplitCount() < MAX_PENDING_SPLITS
+ && System.currentTimeMillis() < deadline) {
+ int before = offsetProvider.pendingSplitCount();
+ offsetProvider.advanceSplits();
+ if (offsetProvider.pendingSplitCount() <= before) {
+ break; // nothing produced this round; avoid spin
+ }
+ }
Review Comment:
The 'no-progress' check uses `pendingSplitCount()` as a proxy for
'advanceSplits produced something'. If the consumer drains the pending queue
concurrently, `pendingSplitCount()` can stay flat or even decrease after
`advanceSplits()` despite producing new splits, causing the loop to break
prematurely and reducing snapshot throughput. Consider changing
`advanceSplits()` (or adding an additional API) to return a produced-split
count/boolean, or track a monotonic 'producedSplits' counter/version from the
provider to detect no-progress without being affected by concurrent consumption.
##########
fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java:
##########
@@ -737,8 +739,20 @@ public void advanceSplitsIfNeed() throws JobException {
if (offsetProvider.noMoreSplits()) {
return;
}
+ // Admit as many splits as possible this tick, up to
MAX_PENDING_SPLITS backlog, then yield
+ // before the next tick (deadline) which resumes the rest.
Review Comment:
The comment says the loop admits 'up to MAX_PENDING_SPLITS backlog', but the
condition is `< MAX_PENDING_SPLITS`, which allows a single `advanceSplits()`
call to overshoot the cap (as the new UT also expects). Either (a) update the
comment to reflect that the loop may cross the cap by one round, or (b) enforce
the cap strictly by breaking immediately after `advanceSplits()` if the backlog
is now `>= MAX_PENDING_SPLITS`.
##########
fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java:
##########
@@ -737,8 +739,20 @@ public void advanceSplitsIfNeed() throws JobException {
if (offsetProvider.noMoreSplits()) {
return;
}
+ // Admit as many splits as possible this tick, up to
MAX_PENDING_SPLITS backlog, then yield
+ // before the next tick (deadline) which resumes the rest.
+ long intervalMs = jobProperties.getMaxIntervalSecond() * 1000L;
+ long deadline = System.currentTimeMillis() + intervalMs -
Math.min(1000L, intervalMs / 10);
Review Comment:
Using `System.currentTimeMillis()` for relative deadline calculations is
vulnerable to wall-clock adjustments (NTP/time changes), which can
unintentionally extend or shorten the loop run time. Prefer a monotonic time
source for elapsed/deadline comparisons (e.g., compute deadline using
`System.nanoTime()` and compare with `nanoTime()` inside the loop).
##########
fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java:
##########
@@ -198,6 +198,11 @@ default boolean noMoreSplits() {
return true;
}
+ /** Splits produced but not yet consumed (FE-side backlog). */
+ default int pendingSplitCount() {
+ return 0;
+ }
Review Comment:
Because `StreamingInsertJob` uses `pendingSplitCount()` as a safety valve to
bound FE-side backlog, a default implementation that always returns 0 silently
disables that safety for any provider that forgets to override it. Consider
strengthening the contract by documenting that providers supporting
`advanceSplits()` must override `pendingSplitCount()`, or by returning a
sentinel (and having the caller handle it explicitly) to avoid silently
bypassing the cap.
##########
fe/fe-core/src/test/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJobAdvanceSplitsTest.java:
##########
@@ -0,0 +1,160 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.job.extensions.insert.streaming;
+
+import org.apache.doris.common.jmockit.Deencapsulation;
+import org.apache.doris.job.offset.Offset;
+import org.apache.doris.job.offset.SourceOffsetProvider;
+import
org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Covers the bounded admission loop in {@link
StreamingInsertJob#advanceSplitsIfNeed()}:
+ * it must keep admitting within one tick yet terminate on every in-loop exit
condition
+ * (noMoreSplits / pending cap / no-progress round) without spinning. The
deadline term is
+ * wall-clock based and is covered by integration/regression rather than here.
+ */
+public class StreamingInsertJobAdvanceSplitsTest {
+
+ // Assumed value of StreamingInsertJob.MAX_PENDING_SPLITS; update both
together if changed.
+ private static final int CAP = 512;
Review Comment:
The test hard-codes `512` and relies on a manual sync with
`StreamingInsertJob.MAX_PENDING_SPLITS`. To avoid future drift, consider
reading the constant reflectively (since this test already uses
`Deencapsulation`) or exposing it in a test-visible way (e.g., package-private
getter/constant) so the test automatically stays consistent with production.
##########
fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java:
##########
@@ -737,8 +739,20 @@ public void advanceSplitsIfNeed() throws JobException {
if (offsetProvider.noMoreSplits()) {
return;
}
+ // Admit as many splits as possible this tick, up to
MAX_PENDING_SPLITS backlog, then yield
+ // before the next tick (deadline) which resumes the rest.
+ long intervalMs = jobProperties.getMaxIntervalSecond() * 1000L;
+ long deadline = System.currentTimeMillis() + intervalMs -
Math.min(1000L, intervalMs / 10);
try {
- offsetProvider.advanceSplits();
+ while (!offsetProvider.noMoreSplits()
+ && offsetProvider.pendingSplitCount() < MAX_PENDING_SPLITS
Review Comment:
The comment says the loop admits 'up to MAX_PENDING_SPLITS backlog', but the
condition is `< MAX_PENDING_SPLITS`, which allows a single `advanceSplits()`
call to overshoot the cap (as the new UT also expects). Either (a) update the
comment to reflect that the loop may cross the cap by one round, or (b) enforce
the cap strictly by breaking immediately after `advanceSplits()` if the backlog
is now `>= MAX_PENDING_SPLITS`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]