github-actions[bot] commented on code in PR #61826:
URL: https://github.com/apache/doris/pull/61826#discussion_r3008453531
##########
fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java:
##########
@@ -51,6 +51,7 @@
import org.apache.doris.job.offset.SourceOffsetProvider;
import org.apache.doris.job.offset.SourceOffsetProviderFactory;
import org.apache.doris.job.offset.jdbc.JdbcSourceOffsetProvider;
+import org.apache.doris.job.offset.jdbc.JdbcTvfSourceOffsetProvider;
import org.apache.doris.job.util.StreamingJobUtils;
Review Comment:
**[High] RPC under write lock**: `getCommitOffsetJson()` is called while the
job's write lock is held (acquired at line 1016). For
`JdbcTvfSourceOffsetProvider`, this method calls `fetchTaskEndOffset()` which
performs BRPC calls to backend nodes. Performing RPC under metadata locks is
problematic: if the BE is slow or unresponsive, the FE thread will be blocked
while holding the write lock, preventing all other operations on this job.
Suggestion: Move the `getCommitOffsetJson()` call **before** the
`writeLock()` acquisition. The running task state should be stable at this
point since the task has completed execution. The lock is only needed for the
`txnState.setTxnCommitAttachment()` call.
```java
// Fetch offset BEFORE acquiring lock (involves RPC)
String offsetJson = offsetProvider.getCommitOffsetJson(
runningStreamTask.getRunningOffset(),
runningStreamTask.getTaskId(),
runningStreamTask.getScanBackendIds());
if (StringUtils.isBlank(offsetJson)) {
throw new TransactionException(...);
}
writeLock();
try {
// only hold lock for the metadata mutation
...
} finally {
writeUnlock(); // also fix the pre-existing shouldReleaseLock bug
}
```
##########
fe/fe-core/src/main/java/org/apache/doris/tablefunction/CdcStreamTableValuedFunction.java:
##########
@@ -51,24 +62,34 @@ private void processProps(Map<String, String> properties)
throws AnalysisExcepti
Map<String, String> copyProps = new HashMap<>(properties);
copyProps.put("format", "json");
super.parseCommonProperties(copyProps);
- this.processedParams.put("enable_cdc_client", "true");
- this.processedParams.put("uri", URI);
- this.processedParams.put("http.enable.range.request", "false");
Review Comment:
**[Low] Side-effecting `remove()` on input map**:
`properties.remove(TASK_ID_KEY)` and `properties.remove(META_KEY)` mutate the
`properties` map that was passed in. Since
`recordRequest.setConfig(properties)` (line 82) sets the same map reference,
these removes also affect `recordRequest.config`. While currently the caller
(`rewriteTvfParams`) creates a fresh `HashMap`, this is a fragile implicit
contract.
Consider either:
1. Using `properties.get()` + `properties.get()` instead of `remove()`, and
let the serialization naturally exclude them via `@JsonIgnore` or similar
2. Or at minimum add a comment explaining why `remove()` is used (to exclude
these keys from the config map serialized into the JSON payload)
##########
fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcTvfSourceOffsetProvider.java:
##########
@@ -0,0 +1,367 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.job.offset.jdbc;
+
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.Config;
+import org.apache.doris.httpv2.entity.ResponseBody;
+import org.apache.doris.job.cdc.DataSourceConfigKeys;
+import org.apache.doris.job.cdc.split.AbstractSourceSplit;
+import org.apache.doris.job.cdc.split.BinlogSplit;
+import org.apache.doris.job.cdc.split.SnapshotSplit;
+import org.apache.doris.job.common.DataSourceType;
+import org.apache.doris.job.exception.JobException;
+import org.apache.doris.job.extensions.insert.streaming.StreamingInsertJob;
+import org.apache.doris.job.offset.Offset;
+import org.apache.doris.job.util.StreamingJobUtils;
+import org.apache.doris.nereids.analyzer.UnboundTVFRelation;
+import org.apache.doris.nereids.trees.expressions.Properties;
+import org.apache.doris.nereids.trees.plans.Plan;
+import
org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand;
+import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
+import org.apache.doris.proto.InternalService;
+import org.apache.doris.proto.InternalService.PRequestCdcClientResult;
+import org.apache.doris.rpc.BackendServiceProxy;
+import org.apache.doris.system.Backend;
+import org.apache.doris.tablefunction.CdcStreamTableValuedFunction;
+import org.apache.doris.thrift.TNetworkAddress;
+import org.apache.doris.thrift.TStatusCode;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import com.google.gson.Gson;
+import lombok.extern.log4j.Log4j2;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.collections4.MapUtils;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.Future;
+import java.util.stream.Collectors;
+
+/**
+ * OffsetProvider for cdc_stream TVF path.
+ *
+ * <p>Differs from JdbcSourceOffsetProvider (non-TVF path) in:
+ * <ul>
+ * <li>offset commit: FE pulls actual end offset from BE via
/api/getTaskOffset/{taskId} in
+ * beforeCommitted, stores in txn attachment (transactionally safe)</li>
+ * <li>cloud mode snapshot: attachment carries cumulative
chunkHighWatermarkMap so that
+ * replayOnCloudMode can recover full state from the single latest
attachment in MS</li>
+ * <li>recovery: state is rebuilt from txn replay (chunkHighWatermarkMap
populated by
+ * replayOnCommitted/replayOnCloudMode -> updateOffset), not from
EditLog</li>
+ * <li>updateOffset: during replay remainingSplits is empty so removeIf
returns false naturally;
+ * chunkHighWatermarkMap is always updated unconditionally to support
recovery</li>
+ * <li>replayIfNeed: checks currentOffset directly — snapshot triggers
remainingSplits rebuild
+ * from meta + chunkHighWatermarkMap; binlog needs no action
(currentOffset already set)</li>
+ * </ul>
+ */
+@Log4j2
+public class JdbcTvfSourceOffsetProvider extends JdbcSourceOffsetProvider {
+
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+ /**
+ * No-arg constructor required by
SourceOffsetProviderFactory.createSourceOffsetProvider().
+ */
+ public JdbcTvfSourceOffsetProvider() {
+ super();
+ }
+
+ /**
+ * Initializes provider state and fetches snapshot splits from BE.
+ * splitChunks is called here (rather than in StreamingInsertJob) to keep
+ * all cdc_stream-specific init logic inside the provider.
+ */
+ @Override
+ public void ensureInitialized(Long jobId, Map<String, String>
originTvfProps) throws JobException {
+ if (this.jobId != null) {
+ return;
+ }
+ this.jobId = jobId;
+ this.sourceProperties = originTvfProps;
+ this.chunkHighWatermarkMap = new HashMap<>();
+ String type = originTvfProps.get(DataSourceConfigKeys.TYPE);
+ Preconditions.checkArgument(type != null, "type is required");
+ this.sourceType = DataSourceType.valueOf(type.toUpperCase());
+ this.snapshotParallelism = Integer.parseInt(
+
originTvfProps.getOrDefault(DataSourceConfigKeys.SNAPSHOT_PARALLELISM,
+ DataSourceConfigKeys.SNAPSHOT_PARALLELISM_DEFAULT));
+ String table = originTvfProps.get(DataSourceConfigKeys.TABLE);
+ Preconditions.checkArgument(table != null, "table is required for
cdc_stream TVF");
+ }
+
+ /**
+ * Called once on fresh job creation (not on FE restart).
+ * Fetches snapshot splits from BE and persists them to the meta table.
+ */
+ @Override
+ public void initOnCreate() throws JobException {
+ String table = sourceProperties.get(DataSourceConfigKeys.TABLE);
+ splitChunks(Collections.singletonList(table));
+ }
+
+ /**
+ * Rewrites the cdc_stream TVF SQL with current offset meta and taskId,
+ * so the BE knows where to start reading and can report
+ * the end offset back via taskOffsetCache.
+ */
+ @Override
+ public InsertIntoTableCommand rewriteTvfParams(InsertIntoTableCommand
originCommand,
+ Offset runningOffset, long taskId) {
+ JdbcOffset offset = (JdbcOffset) runningOffset;
+ Map<String, String> props = new HashMap<>();
+ Plan rewritePlan = originCommand.getParsedPlan().get().rewriteUp(plan
-> {
+ if (plan instanceof UnboundTVFRelation) {
+ UnboundTVFRelation originTvfRel = (UnboundTVFRelation) plan;
+ props.putAll(originTvfRel.getProperties().getMap());
+ props.put(CdcStreamTableValuedFunction.META_KEY, new
Gson().toJson(offset.generateMeta()));
+ props.put(CdcStreamTableValuedFunction.JOB_ID_KEY,
String.valueOf(jobId));
+ props.put(CdcStreamTableValuedFunction.TASK_ID_KEY,
String.valueOf(taskId));
+ return new UnboundTVFRelation(
+ originTvfRel.getRelationId(),
originTvfRel.getFunctionName(), new Properties(props));
+ }
+ return plan;
+ });
+ InsertIntoTableCommand cmd = new InsertIntoTableCommand((LogicalPlan)
rewritePlan,
+ Optional.empty(), Optional.empty(), Optional.empty(), true,
Optional.empty());
+ cmd.setJobId(originCommand.getJobId());
+ return cmd;
+ }
+
+ /**
+ * Returns the serialized JSON offset to store in txn commit attachment.
+ *
+ * <p>Calls BE /api/getTaskOffset/{taskId} to get the actual end offset
recorded after
+ * fetchRecordStream completes (stored in
PipelineCoordinator.taskOffsetCache).
+ *
+ * <p>For cloud + snapshot: returns cumulative list (all previously
completed chunks +
+ * current task's new splits) so that replayOnCloudMode can recover full
state from latest attachment.
+ * For non-cloud snapshot / binlog: returns only current task's splits.
+ */
+ @Override
+ public String getCommitOffsetJson(Offset runningOffset, long taskId,
List<Long> scanBackendIds) {
+ List<Map<String, String>> currentTaskEndOffset =
fetchTaskEndOffset(taskId, scanBackendIds);
+ if (CollectionUtils.isEmpty(currentTaskEndOffset)) {
+ return "";
+ }
+
+ // Cloud + snapshot: prepend all previously completed chunks so the
attachment is
+ // self-contained for replayOnCloudMode (MS only keeps the latest
attachment)
+ if (Config.isCloudMode() && ((JdbcOffset)
runningOffset).snapshotSplit()) {
+ List<Map<String, String>> cumulative =
buildCumulativeSnapshotOffset(currentTaskEndOffset);
+ return new Gson().toJson(cumulative);
+ }
+ return new Gson().toJson(currentTaskEndOffset);
+ }
+
+ /**
+ * Queries each scan backend in order until one returns a non-empty offset
for this taskId.
+ * Only the BE that ran the cdc_stream TVF scan node will have the entry
in taskOffsetCache.
+ */
+ private List<Map<String, String>> fetchTaskEndOffset(long taskId,
List<Long> scanBackendIds) {
Review Comment:
**[High] `future.get()` without timeout**: At line 198, `future.get()` is
called without a timeout. If the BE is unresponsive or the CDC client is hung,
this call will block the FE thread indefinitely. This is especially dangerous
because this method is called from the transaction commit path
(`beforeCommitted`).
Use `future.get(timeout, TimeUnit.MILLISECONDS)` with a reasonable timeout
(e.g., 30 seconds):
```java
InternalService.PRequestCdcClientResult result = future.get(30000,
TimeUnit.MILLISECONDS);
```
And catch `TimeoutException` appropriately.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]