github-actions[bot] commented on code in PR #60116: URL: https://github.com/apache/doris/pull/60116#discussion_r2980531230
########## fe/fe-core/src/main/java/org/apache/doris/tablefunction/CdcStreamTableValuedFunction.java: ########## @@ -0,0 +1,140 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.tablefunction; + +import org.apache.doris.analysis.BrokerDesc; +import org.apache.doris.analysis.StorageBackend.StorageType; +import org.apache.doris.catalog.Column; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.datasource.jdbc.client.JdbcClient; +import org.apache.doris.job.cdc.DataSourceConfigKeys; +import org.apache.doris.job.cdc.request.FetchRecordRequest; +import org.apache.doris.job.common.DataSourceType; +import org.apache.doris.job.util.StreamingJobUtils; +import org.apache.doris.thrift.TBrokerFileStatus; +import org.apache.doris.thrift.TFileType; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +public class CdcStreamTableValuedFunction extends ExternalFileTableValuedFunction { + private static final ObjectMapper objectMapper = new ObjectMapper(); + private static final String URI = "http://127.0.0.1:CDC_CLIENT_PORT/api/fetchRecordStream"; + + public CdcStreamTableValuedFunction(Map<String, String> properties) throws AnalysisException { + validate(properties); + processProps(properties); + } + + private void processProps(Map<String, String> properties) throws AnalysisException { + Map<String, String> copyProps = new HashMap<>(properties); + copyProps.put("format", "json"); + super.parseCommonProperties(copyProps); + this.processedParams.put("enable_cdc_client", "true"); + this.processedParams.put("uri", URI); + this.processedParams.put("http.enable.range.request", "false"); + this.processedParams.put("http.enable.chunk.response", "true"); + this.processedParams.put("http.method", "POST"); + + String payload = generateParams(properties); + this.processedParams.put("http.payload", payload); + this.backendConnectProperties.putAll(processedParams); + generateFileStatus(); + } + + private String generateParams(Map<String, String> properties) throws AnalysisException { + FetchRecordRequest recordRequest = new FetchRecordRequest(); + recordRequest.setJobId(UUID.randomUUID().toString().replace("-", "")); + recordRequest.setDataSource(properties.get(DataSourceConfigKeys.TYPE)); + recordRequest.setConfig(properties); + try { + return objectMapper.writeValueAsString(recordRequest); + } catch (IOException e) { + LOG.info("Failed to serialize fetch record request," + e.getMessage()); + throw new AnalysisException(e.getMessage()); + } + } + + private void validate(Map<String, String> properties) throws AnalysisException { + if (!properties.containsKey(DataSourceConfigKeys.JDBC_URL)) { Review Comment: **[Low]** `validate()` only checks for `jdbc_url`, `type`, `table`, and `offset`, but `getTableColumns()` and the CDC client also require `user`, `password`, `driver_url`, and `driver_class`. If these are missing, the user will get a cryptic JDBC connection error instead of a clear validation message. Consider adding checks for these required properties, or at minimum `user` and `password`. ########## fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java: ########## @@ -257,7 +251,7 @@ private PostgresSourceConfig generatePostgresConfig( return configFactory.create(subtaskId); } - private String getSlotName(Long jobId) { + private String getSlotName(String jobId) { return "doris_cdc_" + jobId; Review Comment: **[Low]** `getSlotName()` now concatenates `"doris_cdc_" + jobId` where `jobId` can be a 32-character UUID (from TVF). This produces a 42-character slot name. PostgreSQL replication slot names have a max length of 63 characters (per `NAMEDATALEN`), so this is within bounds, but barely. Worth documenting the constraint here. Also note that UUID-based slot names are cleaned up at the end of `buildStreamRecords()` via `sourceReader.close(fetchRecord)`, which should drop the slot. If the close fails or the process crashes, orphan slots will remain in PostgreSQL. ########## fe/fe-core/src/main/java/org/apache/doris/tablefunction/CdcStreamTableValuedFunction.java: ########## @@ -0,0 +1,140 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.tablefunction; + +import org.apache.doris.analysis.BrokerDesc; +import org.apache.doris.analysis.StorageBackend.StorageType; +import org.apache.doris.catalog.Column; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.datasource.jdbc.client.JdbcClient; +import org.apache.doris.job.cdc.DataSourceConfigKeys; +import org.apache.doris.job.cdc.request.FetchRecordRequest; +import org.apache.doris.job.common.DataSourceType; +import org.apache.doris.job.util.StreamingJobUtils; +import org.apache.doris.thrift.TBrokerFileStatus; +import org.apache.doris.thrift.TFileType; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +public class CdcStreamTableValuedFunction extends ExternalFileTableValuedFunction { + private static final ObjectMapper objectMapper = new ObjectMapper(); + private static final String URI = "http://127.0.0.1:CDC_CLIENT_PORT/api/fetchRecordStream"; + + public CdcStreamTableValuedFunction(Map<String, String> properties) throws AnalysisException { + validate(properties); + processProps(properties); + } + + private void processProps(Map<String, String> properties) throws AnalysisException { + Map<String, String> copyProps = new HashMap<>(properties); + copyProps.put("format", "json"); + super.parseCommonProperties(copyProps); + this.processedParams.put("enable_cdc_client", "true"); + this.processedParams.put("uri", URI); + this.processedParams.put("http.enable.range.request", "false"); + this.processedParams.put("http.enable.chunk.response", "true"); + this.processedParams.put("http.method", "POST"); + + String payload = generateParams(properties); + this.processedParams.put("http.payload", payload); + this.backendConnectProperties.putAll(processedParams); + generateFileStatus(); + } + + private String generateParams(Map<String, String> properties) throws AnalysisException { + FetchRecordRequest recordRequest = new FetchRecordRequest(); + recordRequest.setJobId(UUID.randomUUID().toString().replace("-", "")); + recordRequest.setDataSource(properties.get(DataSourceConfigKeys.TYPE)); + recordRequest.setConfig(properties); + try { + return objectMapper.writeValueAsString(recordRequest); + } catch (IOException e) { + LOG.info("Failed to serialize fetch record request," + e.getMessage()); + throw new AnalysisException(e.getMessage()); Review Comment: **[Medium]** `LOG.info` should be `LOG.warn` or `LOG.error` for a failure path. Using `info` level for an error that triggers an `AnalysisException` is misleading — it will be filtered out in production log analysis: ```java LOG.info("Failed to serialize fetch record request," + e.getMessage()); ``` Should be: ```java LOG.warn("Failed to serialize fetch record request: {}", e.getMessage(), e); ``` Also note the missing space after the comma in the message, and the use of string concatenation instead of parameterized logging. ########## fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java: ########## @@ -51,26 +58,31 @@ import static org.apache.flink.cdc.connectors.base.utils.SourceRecordUtils.SCHEMA_HEARTBEAT_EVENT_KEY_NAME; import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Preconditions; import io.debezium.data.Envelope; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; +import org.springframework.web.servlet.mvc.method.annotation.StreamingResponseBody; /** Pipeline coordinator. */ @Component public class PipelineCoordinator { private static final Logger LOG = LoggerFactory.getLogger(PipelineCoordinator.class); private static final String SPLIT_ID = "splitId"; // jobId - private final Map<Long, DorisBatchStreamLoad> batchStreamLoadMap = new ConcurrentHashMap<>(); + private final Map<String, DorisBatchStreamLoad> batchStreamLoadMap = new ConcurrentHashMap<>(); + // taskId, offset + private final Map<String, Map<String, String>> taskOffsetCache = new ConcurrentHashMap<>(); // taskId -> writeFailReason Review Comment: **[Low]** `taskOffsetCache` is a `ConcurrentHashMap` that is populated at the end of `buildStreamRecords` but only cleaned up by `getOffsetWithTaskId()` (via `remove()`). If the caller never retrieves the offset (e.g., due to an error in the query pipeline), entries will accumulate indefinitely. Consider adding a TTL-based eviction or a periodic cleanup mechanism to prevent unbounded memory growth in long-running CDC client processes. ########## fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/sink/DorisBatchStreamLoad.java: ########## @@ -511,7 +511,7 @@ public void commitOffset( CommitOffsetRequest commitRequest = CommitOffsetRequest.builder() .offset(OBJECT_MAPPER.writeValueAsString(meta)) - .jobId(jobId) + .jobId(Long.parseLong(jobId)) .taskId(Long.parseLong(taskId)) Review Comment: **[Low]** `Long.parseLong(jobId)` will throw `NumberFormatException` if `jobId` is a UUID string (as generated by the TVF path in `CdcStreamTableValuedFunction.generateParams()`). While the TVF streaming path does not go through `DorisBatchStreamLoad.commitOffset()`, this conversion is fragile — if the code paths ever converge, this will fail silently. Consider adding a guard or documenting the assumption clearly. ########## fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java: ########## @@ -90,6 +102,159 @@ public PipelineCoordinator() { new ThreadPoolExecutor.AbortPolicy()); } + /** return data for http_file_reader */ + public StreamingResponseBody fetchRecordStream(FetchRecordRequest fetchReq) throws Exception { + SourceReader sourceReader; + SplitReadResult readResult; + try { + if (fetchReq.getTaskId() == null && fetchReq.getMeta() == null) { + LOG.info( + "Generate initial meta for fetch record request, jobId={}, taskId={}", + fetchReq.getJobId(), + fetchReq.getTaskId()); + // means the request did not originate from the job, only tvf + Map<String, Object> meta = generateMeta(fetchReq.getConfig()); + fetchReq.setMeta(meta); + } + + sourceReader = Env.getCurrentEnv().getReader(fetchReq); + readResult = sourceReader.prepareAndSubmitSplit(fetchReq); + } catch (Exception ex) { + throw new CommonException(ex); + } + + return outputStream -> { + try { + buildStreamRecords(sourceReader, fetchReq, readResult, outputStream); + } catch (Exception ex) { + LOG.error( + "Failed fetch record, jobId={}, taskId={}", + fetchReq.getJobId(), + fetchReq.getTaskId(), + ex); + throw new StreamException(ex); + } + }; + } + + private void buildStreamRecords( + SourceReader sourceReader, + FetchRecordRequest fetchRecord, + SplitReadResult readResult, + OutputStream rawOutputStream) + throws Exception { + SourceSplit split = readResult.getSplit(); + Map<String, String> lastMeta = null; + int rowCount = 0; + BufferedOutputStream bos = new BufferedOutputStream(rawOutputStream); + try { + // Poll records using the existing mechanism + boolean shouldStop = false; + long startTime = System.currentTimeMillis(); + while (!shouldStop) { + Iterator<SourceRecord> recordIterator = sourceReader.pollRecords(); + if (!recordIterator.hasNext()) { + Thread.sleep(100); + long elapsedTime = System.currentTimeMillis() - startTime; Review Comment: **[Nit]** The `Thread.sleep(100)` busy-wait loop with timeout is acceptable for the bounded CDC streaming use case, but consider using `Thread.sleep` with an exponential backoff (e.g., 100ms → 200ms → 400ms up to a cap) to reduce CPU overhead when the source is slow. The timeout (`Constants.POLL_SPLIT_RECORDS_TIMEOUTS`) already bounds the overall wait. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
