Copilot commented on code in PR #7654:
URL: https://github.com/apache/incubator-seata/pull/7654#discussion_r2432706727


##########
console/src/main/java/org/apache/seata/mcp/service/impl/MCPRPCServiceImpl.java:
##########
@@ -0,0 +1,448 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seata.mcp.service.impl;
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONException;
+import com.alibaba.fastjson.JSONObject;
+import io.netty.handler.timeout.ReadTimeoutHandler;
+import org.apache.seata.common.result.SingleResult;
+import org.apache.seata.common.util.StringUtils;
+import org.apache.seata.console.config.WebSecurityConfig;
+import org.apache.seata.console.utils.JwtTokenUtils;
+import org.apache.seata.mcp.annotation.Tool;
+import org.apache.seata.mcp.entity.pojo.NameSpaceDetail;
+import org.apache.seata.mcp.handler.CustomResponseErrorHandler;
+import org.apache.seata.mcp.service.MCPRPCService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.core.env.Environment;
+import org.springframework.core.io.buffer.DataBuffer;
+import org.springframework.core.io.buffer.DataBufferUtils;
+import org.springframework.http.HttpEntity;
+import org.springframework.http.HttpHeaders;
+import org.springframework.http.HttpMethod;
+import org.springframework.http.ResponseEntity;
+import org.springframework.http.client.reactive.ReactorClientHttpConnector;
+import 
org.springframework.security.authentication.UsernamePasswordAuthenticationToken;
+import org.springframework.security.core.Authentication;
+import org.springframework.stereotype.Service;
+import org.springframework.web.client.RestClientException;
+import org.springframework.web.client.RestTemplate;
+import org.springframework.web.reactive.function.client.WebClient;
+import org.springframework.web.util.UriComponentsBuilder;
+import reactor.core.publisher.Mono;
+import reactor.netty.http.client.HttpClient;
+
+import javax.annotation.PostConstruct;
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.nio.channels.AsynchronousFileChannel;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardOpenOption;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+@Service
+public class MCPRPCServiceImpl implements MCPRPCService {
+    @Autowired
+    private Environment env;
+
+    @Autowired
+    private JwtTokenUtils jwtTokenUtils;
+
+    private final RestTemplate restTemplate = new RestTemplate();
+
+    private final String NAMING_SPACE_URL = "http://127.0.0.1:%s";;

Review Comment:
   Hard-coded localhost IP address should be configurable. Consider using a 
configuration property instead of the hard-coded 127.0.0.1 to support different 
deployment scenarios.



##########
console/src/main/java/org/apache/seata/mcp/store/SqlExecutionTemplate.java:
##########
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seata.mcp.store;
+
+import org.apache.seata.common.exception.StoreException;
+import org.apache.seata.common.result.PageResult;
+import org.apache.seata.common.util.IOUtil;
+import org.apache.seata.common.util.PageUtil;
+import org.apache.seata.common.util.StringUtils;
+import org.apache.seata.mcp.entity.vo.UndoLogVO;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Service;
+
+import javax.sql.DataSource;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Pattern;
+
+@Service
+public class SqlExecutionTemplate {
+
+    private static final Pattern SELECT_PATTERN =
+            Pattern.compile("^\\s*SELECT\\b.*", Pattern.CASE_INSENSITIVE | 
Pattern.DOTALL);
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(SqlExecutionTemplate.class);
+
+    private DataSource getDataSource(String resourceId) {
+        try {
+            return DataSourceFactory.getDataSource(resourceId);
+        } catch (Exception e) {
+            LOGGER.error("Failed to get the data source, resourceId: {}", 
resourceId, e);
+            throw new StoreException("Unable to get the data source: " + 
resourceId);
+        }
+    }
+
+    private boolean validateQuerySql(String sql) {
+        if (sql == null || StringUtils.isBlank(sql)) {
+            return false;
+        }
+        return SELECT_PATTERN.matcher(sql).matches();
+    }
+
+    public List<Map<String, Object>> query(String resourceId, String sql, 
Object... params) {
+        Connection conn = null;
+        PreparedStatement ps = null;
+        ResultSet rs = null;
+
+        try {
+            if (!validateQuerySql(sql)) {
+                throw new StoreException("The query valid failed,Only query 
operations are allowed:" + sql);
+            }
+            conn = getConnection(resourceId);
+            if (params == null || params.length == 0) {
+                if ((sql.contains("where") || sql.contains("WHERE"))) {
+                    throw new StoreException(
+                            "Query contains WHERE clause but no parameters 
were provided. This may lead to unintended full table scans and is not 
allowed.");
+                }
+            }
+            ps = conn.prepareStatement(sql);
+            if (params != null) {
+                for (int i = 0; i < params.length; i++) {
+                    ps.setObject(i + 1, params[i]);
+                }
+            }
+
+            rs = ps.executeQuery();
+            ResultSetMetaData metaData = rs.getMetaData();
+            int columnCount = metaData.getColumnCount();
+
+            List<Map<String, Object>> results = new ArrayList<>();
+            while (rs.next()) {
+                Map<String, Object> row = new HashMap<>();
+                for (int i = 1; i <= columnCount; i++) {
+                    String columnName = metaData.getColumnLabel(i);
+                    Object value = rs.getObject(i);
+                    row.put(columnName, value);
+                }
+                results.add(row);
+            }
+
+            return results;
+        } catch (SQLException e) {
+            LOGGER.error("The query failed, resourceId: {}, sql: {}", 
resourceId, sql, e);
+            throw new StoreException("The query execution failed: " + 
e.getMessage());
+        } finally {
+            LOGGER.info("User query business datasource with sql: {}", sql);
+            closeResources(rs, ps, conn);
+        }
+    }
+
+    public PageResult<UndoLogVO> queryForUndoLogs(
+            String resourceId, String sql, Integer pageNum, Integer pageSize, 
Object... params) {
+        Connection conn = null;
+        PreparedStatement ps = null;
+        PreparedStatement countPs = null;
+        ResultSet rs = null;
+        ResultSet countRs = null;
+        List<UndoLogVO> data = new ArrayList<>();
+        int count = 0;
+
+        try {
+            if (!validateQuerySql(sql)) {
+                throw new StoreException("The query valid failed,Only query 
operations are allowed:" + sql);
+            }
+            conn = getConnection(resourceId);
+            if (params == null || params.length == 0) {
+                if ((sql.contains("where") || sql.contains("WHERE"))) {
+                    sql = sql.replaceAll("(?i)\\bWHERE\\b.*", "").trim();

Review Comment:
   Removing WHERE clauses from SQL queries without parameters can lead to full 
table scans and potential performance issues. This approach is risky and should 
be reconsidered or at least documented with clear warnings.
   ```suggestion
                       throw new StoreException("Query contains WHERE clause 
but no parameters were provided. Removing WHERE clauses can lead to full table 
scans and performance issues. Please provide parameters or revise your query.");
   ```



##########
server/src/main/java/org/apache/seata/server/console/impl/file/ServerLogFileServiceImpl.java:
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seata.server.console.impl.file;
+
+import org.apache.seata.common.ConfigurationKeys;
+import org.apache.seata.server.console.entity.param.ServerLogParam;
+import org.apache.seata.server.console.service.ServerLogService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.core.env.Environment;
+import org.springframework.http.HttpHeaders;
+import org.springframework.http.HttpStatus;
+import org.springframework.http.MediaType;
+import org.springframework.http.ResponseEntity;
+import org.springframework.stereotype.Service;
+import 
org.springframework.web.servlet.mvc.method.annotation.StreamingResponseBody;
+
+import java.io.IOException;
+import java.nio.channels.Channels;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.FileChannel;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardOpenOption;
+
+@Service
+public class ServerLogFileServiceImpl implements ServerLogService {
+
+    @Autowired
+    private Environment env;
+
+    private static final String DEFAULT_APP_NAME = "seata-server";
+
+    private static final Integer MAX_LOG_FILE_SIZE = 500 * 1024 * 1024; // 
500MB
+
+    private final Logger LOGGER = 
LoggerFactory.getLogger(ServerLogFileServiceImpl.class);
+
+    @Override
+    public ResponseEntity<StreamingResponseBody> 
getServerLogFile(ServerLogParam serverLogParam) {
+        String logPathString = buildLogFilePath(serverLogParam);
+        Path logPath = Paths.get(logPathString);
+        if (Files.exists(logPath)) {
+            long size = 0;
+            long modifyTime = 0;
+            try {
+                modifyTime = Files.getLastModifiedTime(logPath).toMillis();
+                size = Files.size(logPath);
+            } catch (IOException e) {
+                LOGGER.warn("Error get log file size: {}", e.getMessage());
+            }
+            if (size > MAX_LOG_FILE_SIZE) {
+                return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
+                        .body(out ->
+                                out.write(("Log File exceed the Max Size: " + 
MAX_LOG_FILE_SIZE + " B").getBytes()));
+            }
+            Long lastModifyTime = serverLogParam.getLastModifyTime();
+            long finalCurSize;
+            if (lastModifyTime == 0 || modifyTime - lastModifyTime > 86400000) 
{
+                finalCurSize = 0;
+            } else {
+                finalCurSize = serverLogParam.getCurSize();
+            }
+            long finalSize = size;
+            if (finalCurSize > size) {
+                return ResponseEntity.status(HttpStatus.BAD_REQUEST)
+                        .body(out ->
+                                out.write(("Remote Log File is newer than TC, 
please check the log file").getBytes()));
+            }
+            StreamingResponseBody responseBody = outputStream -> {
+                try (FileChannel channel = FileChannel.open(logPath, 
StandardOpenOption.READ)) {
+                    long position = finalCurSize;
+                    long remaining = finalSize;
+
+                    while (remaining > 0) {
+                        long transferred = channel.transferTo(position, 
remaining, Channels.newChannel(outputStream));
+
+                        if (transferred <= 0) {
+                            break;
+                        }
+
+                        position += transferred;
+                        remaining -= transferred;
+
+                        outputStream.flush();
+                    }
+                } catch (IOException e) {
+                    LOGGER.warn("Error streaming log file: {}", 
e.getMessage());
+                    if (e instanceof ClosedChannelException) {
+                        LOGGER.info("Client closed connection during file 
transfer");
+                    }
+                }
+            };
+            return ResponseEntity.ok()
+                    .header(HttpHeaders.CONTENT_DISPOSITION, "attachment; 
filename=\"" + logPath.getFileName() + "\"")
+                    .header(HttpHeaders.CACHE_CONTROL, "no-cache, no-store, 
must-revalidate")
+                    .header(HttpHeaders.PRAGMA, "no-cache")
+                    .header(HttpHeaders.EXPIRES, "0")
+                    .header("X-APPEND-NEEDED", finalCurSize == 0 ? "false" : 
"true")
+                    .contentType(MediaType.APPLICATION_JSON)

Review Comment:
   Setting content type to APPLICATION_JSON for a log file download is 
incorrect. Log files are typically plain text, so MediaType.TEXT_PLAIN would be 
more appropriate, or use MediaType.APPLICATION_OCTET_STREAM for binary file 
downloads.
   ```suggestion
                       .contentType(MediaType.TEXT_PLAIN)
   ```



##########
console/src/main/java/org/apache/seata/mcp/service/impl/ServerLogServiceImpl.java:
##########
@@ -0,0 +1,320 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seata.mcp.service.impl;
+
+import org.apache.seata.common.util.StringUtils;
+import org.apache.seata.mcp.entity.constant.RPCConstant;
+import org.apache.seata.mcp.entity.param.ServerLogParam;
+import org.apache.seata.mcp.entity.pojo.NameSpaceDetail;
+import org.apache.seata.mcp.entity.vo.ServerLogPageVO;
+import org.apache.seata.mcp.service.MCPRPCService;
+import org.apache.seata.mcp.service.ServerLogService;
+import org.apache.seata.mcp.utils.DateUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+import reactor.core.publisher.Mono;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
+import java.time.format.DateTimeParseException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+@Service
+public class ServerLogServiceImpl implements ServerLogService {
+
+    @Autowired
+    private MCPRPCService mcprpcService;
+
+    // TODO: Support breakpoint download
+    private static final int SERVER_LOG_PAGE_SIZE = 2500;
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(ServerLogServiceImpl.class);
+    private static final DateTimeFormatter LOG_TIMESTAMP_FORMATTER =
+            DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
+
+    private static final ReentrantReadWriteLock fileLock = new 
ReentrantReadWriteLock();
+
+    @Override
+    public ServerLogPageVO<String> analyseServerLogFile(NameSpaceDetail 
nameSpaceDetail, ServerLogParam param) {
+        checkLogParam(param);
+
+        int pageNum = param.getPage();
+        int skipCounts = (pageNum - 1) * SERVER_LOG_PAGE_SIZE;
+
+        Pattern timestampPattern = 
Pattern.compile("^(\\d{4}-\\d{2}-\\d{2}\\s+\\d{2}:\\d{2}:\\d{2}\\.\\d{3})");
+
+        String filePath = getLogFilePath(nameSpaceDetail, param);
+
+        OptimizedPageCollector collector;
+        collector = processLargeFile(filePath, param, timestampPattern, 
skipCounts);
+
+        return ServerLogPageVO.success(
+                collector.getPageEntries(),
+                collector.getTotalCount(),
+                collector.canDetermineNextPage(),
+                pageNum,
+                SERVER_LOG_PAGE_SIZE);
+    }
+
+    private OptimizedPageCollector processLargeFile(
+            String filePath, ServerLogParam param, Pattern timestampPattern, 
int skipCounts) {
+        OptimizedPageCollector collector =
+                new OptimizedPageCollector(param, timestampPattern, 
skipCounts, SERVER_LOG_PAGE_SIZE);
+
+        try (BufferedReader reader = 
Files.newBufferedReader(Paths.get(filePath), StandardCharsets.UTF_8)) {
+            String line;
+            while ((line = reader.readLine()) != null && 
!collector.isComplete()) {
+                collector.processLine(line);
+            }
+            collector.finish();
+        } catch (IOException e) {
+            throw new RuntimeException("Failed to read large log file", e);
+        }
+
+        return collector;
+    }
+
+    private String getLogFilePath(NameSpaceDetail nameSpaceDetail, 
ServerLogParam param) {
+        String namespace = nameSpaceDetail.getNamespace();
+        String vGroup = nameSpaceDetail.getvGroup();
+        String cluster = nameSpaceDetail.getCluster();
+        String key = namespace + "." + vGroup + "." + cluster + "." + 
param.getLogType();
+
+        String fileName = key + "-Server.log";
+        Path filePath = Paths.get(System.getProperty("user.home"), "logs", 
"seata", "console", "tmp", fileName);
+
+        fileLock.writeLock().lock();
+        try {
+            if (Files.exists(filePath)) {
+                try {
+                    BasicFileAttributes attrs = Files.readAttributes(filePath, 
BasicFileAttributes.class);
+                    
param.setLastModifyTime(attrs.lastModifiedTime().toMillis());
+                    param.setCurSize(attrs.size());
+                } catch (IOException e) {
+                    LOGGER.warn("Failed to recheck file attributes: {}", 
filePath, e);
+                }
+            }
+
+            Files.createDirectories(filePath.getParent());
+            downloadLogFile(nameSpaceDetail, filePath.toString(), 
param).block();
+
+            return filePath.toString();
+        } catch (IOException e) {
+            throw new RuntimeException("Failed to create directories", e);
+        } finally {
+            fileLock.writeLock().unlock();
+        }
+    }
+
+    private Mono<Void> downloadLogFile(NameSpaceDetail nameSpaceDetail, String 
outputFilePath, ServerLogParam param) {
+        return mcprpcService.getCallTCLogs(
+                nameSpaceDetail,
+                RPCConstant.SERVER_LOG_BASE_URL + "/getCurrentServerLogFile",
+                param,
+                null,
+                null,
+                outputFilePath);
+    }
+
+    private void checkLogParam(ServerLogParam logParam) {
+        Integer page = logParam.getPage();
+        List<String> logMessageKeyWord = logParam.getLogMessageKeyWord();
+        String logType = logParam.getLogType();
+        String logMessageStartTime = logParam.getLogMessageStartTime();
+        String logMessageEndTime = logParam.getLogMessageEndTime();
+        String logMessageLevel = logParam.getLogMessageLevel();
+
+        if ((logMessageKeyWord == null || logMessageKeyWord.isEmpty())
+                && StringUtils.isBlank(logMessageStartTime)
+                && (StringUtils.isBlank(logType) || 
logType.equalsIgnoreCase("all"))
+                && StringUtils.isBlank(logMessageLevel)
+                && StringUtils.isBlank(logMessageEndTime)) {
+            throw new IllegalArgumentException(
+                    "It is not allowed to query log data of type all without 
any conditions (except page and logType).");
+        }
+
+        if (StringUtils.isNotBlank(logMessageStartTime) && 
StringUtils.isBlank(logMessageEndTime)) {
+            throw new IllegalArgumentException("It is not allowed to determine 
the start time without the end time");
+        }
+
+        if (page == null || page < 1) {
+            throw new IllegalArgumentException("The page number must be 
greater than or equal to 1");
+        }
+
+        if (StringUtils.isNotBlank(logType)) {
+            if (!Arrays.asList("all", "error", 
"warn").contains(logType.toLowerCase())) {
+                throw new IllegalArgumentException(
+                        "The logType parameter value is invalid and must be: 
all, error, warn");
+            }
+        }
+
+        if (StringUtils.isNotBlank(logMessageLevel)) {
+            if (!Arrays.asList("error", "warn", 
"info").contains(logMessageLevel.toLowerCase())) {
+                throw new IllegalArgumentException(
+                        "The logMessageLevel parameter value is invalid and 
must be: error, warn, info");
+            }
+        }
+    }
+
+    private static class OptimizedPageCollector {
+        private final Pattern timestampPattern;
+        private final int skipCounts;
+        private final int pageSize;
+
+        private final Long startTimeMillis;
+        private final Long endTimeMillis;
+
+        private final Set<String> keywordSet;
+        private final String logLevel;
+
+        private StringBuilder currentEntry = new StringBuilder();
+        private final List<String> pageEntries = new ArrayList<>();
+        private int skipped = 0;
+        private boolean hasMoreEntries = false;
+        private boolean complete = false;
+
+        public OptimizedPageCollector(ServerLogParam param, Pattern 
timestampPattern, int skipCounts, int pageSize) {
+            this.timestampPattern = timestampPattern;
+            this.skipCounts = skipCounts;
+            this.pageSize = pageSize;
+
+            if (StringUtils.isNotBlank(param.getLogMessageStartTime())
+                    && StringUtils.isNotBlank(param.getLogMessageEndTime())) {
+                this.startTimeMillis = 
DateUtils.convertToTimeStampFromDateTime(param.getLogMessageStartTime());
+                this.endTimeMillis = 
DateUtils.convertToTimeStampFromDateTime(param.getLogMessageEndTime());
+            } else {
+                this.startTimeMillis = null;
+                this.endTimeMillis = null;
+            }
+
+            if (param.getLogMessageKeyWord() != null
+                    && !param.getLogMessageKeyWord().isEmpty()) {
+                this.keywordSet = new HashSet<>(param.getLogMessageKeyWord());
+            } else {
+                this.keywordSet = null;
+            }
+
+            this.logLevel = param.getLogMessageLevel();
+        }
+
+        public void processLine(String line) {
+            if (complete) return;
+
+            if (timestampPattern.matcher(line).find()) {
+                flushCurrentEntry();
+                currentEntry = new StringBuilder(line);
+            } else if (currentEntry.length() > 0) {
+                currentEntry.append("\n").append(line);
+            }
+        }
+
+        public void finish() {
+            flushCurrentEntry();
+            complete = true;
+        }
+
+        public boolean isComplete() {
+            return complete;
+        }
+
+        public boolean canDetermineNextPage() {
+            return pageEntries.size() == pageSize && hasMoreEntries;
+        }
+
+        private void flushCurrentEntry() {
+            if (currentEntry.length() == 0) return;
+
+            String completeEntry = currentEntry.toString().trim();
+            if (matchesFilter(completeEntry)) {
+                if (skipped < skipCounts) {
+                    skipped++;
+                } else if (pageEntries.size() < pageSize) {
+                    pageEntries.add(completeEntry);
+                } else {
+                    hasMoreEntries = true;
+                    complete = true;
+                }
+            }
+            currentEntry.setLength(0);
+        }
+
+        private boolean matchesFilter(String entry) {
+            if (logLevel != null && 
!entry.toUpperCase().contains(logLevel.toUpperCase())) {
+                return false;
+            }
+
+            if (keywordSet != null && !keywordSet.isEmpty()) {
+                for (String key : keywordSet) {
+                    if (org.apache.commons.lang.StringUtils.indexOf(entry, 
key) == -1) {

Review Comment:
   Using Apache Commons Lang StringUtils.indexOf() instead of the standard 
String.contains() method adds unnecessary dependency and reduces readability. 
Consider using entry.contains(key) instead.
   ```suggestion
                       if (!entry.contains(key)) {
   ```



##########
namingserver/src/main/resources/application.yml:
##########
@@ -28,10 +36,67 @@ logging:
 heartbeat:
   threshold: 90000
   period: 60000
+console:
+  user:
+    password: seata
+    username: seata
 seata:
   security:
     secretKey: SeataSecretKey0c382ef121d778043159209298fd40bf3850a017
     tokenValidityInMilliseconds: 1800000
     csrf-ignore-urls: /naming/v1/**,/api/v1/naming/**
     ignore:
       urls: 
/,/**/*.css,/**/*.js,/**/*.html,/**/*.map,/**/*.svg,/**/*.png,/**/*.jpeg,/**/*.ico,/api/v1/auth/login,/version.json,/naming/v1/health,/error
+
+  # MCP Server Configuration
+  mcp:
+    # Server name (do not set the default to seata-mcp-server)
+    serverName: seata-mcp-server
+    serverVersion: 2.0.0
+    # MCP service transmission types, available in sse and streamable
+    mcpType: streamable
+
+    # sse type properties
+#    sse:
+#      # SSE endpoint (no default/SSE set)
+#      sseEndpoint: /sse
+#      # message endpoint (no default /message set)
+#      messageEndpoint: /message
+
+    # streamable type properties
+    streamable:
+      mcpEndpoint: /mcp
+      heartBeatSecondDuration: 30
+
+    # Query limits the maximum time interval, The unit is milliseconds, 
Default seven days
+    query:
+      max_query_duration: 604800000
+    # Whether to enable permission authentication
+    auth:
+      enabled: true
+
+  # Business data source configuration
+  businessDataSources:
+    dataSource1:
+      # Whether this data source is enabled
+      enabled: true
+      dbType: mysql
+      driverClassName: com.mysql.cj.jdbc.Driver # Currently, only mysql is 
supported
+      url: jdbc:mysql://:3306/order1?useSSL=false

Review Comment:
   Database URLs are missing hostname and have useSSL=false which disables SSL 
encryption. This exposes database connections to potential security risks. 
Consider using proper hostnames and enabling SSL.



##########
core/src/main/java/org/apache/seata/core/rpc/netty/http/ChannelOutputStreamAdapter.java:
##########
@@ -0,0 +1,37 @@
+package org.apache.seata.core.rpc.netty.http;

Review Comment:
   Missing Apache license header. All source files should include the standard 
Apache Software Foundation license header for consistency with the project's 
licensing requirements.



##########
console/src/main/java/org/apache/seata/mcp/store/SqlExecutionTemplate.java:
##########
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seata.mcp.store;
+
+import org.apache.seata.common.exception.StoreException;
+import org.apache.seata.common.result.PageResult;
+import org.apache.seata.common.util.IOUtil;
+import org.apache.seata.common.util.PageUtil;
+import org.apache.seata.common.util.StringUtils;
+import org.apache.seata.mcp.entity.vo.UndoLogVO;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Service;
+
+import javax.sql.DataSource;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Pattern;
+
+@Service
+public class SqlExecutionTemplate {
+
+    private static final Pattern SELECT_PATTERN =
+            Pattern.compile("^\\s*SELECT\\b.*", Pattern.CASE_INSENSITIVE | 
Pattern.DOTALL);
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(SqlExecutionTemplate.class);
+
+    private DataSource getDataSource(String resourceId) {
+        try {
+            return DataSourceFactory.getDataSource(resourceId);
+        } catch (Exception e) {
+            LOGGER.error("Failed to get the data source, resourceId: {}", 
resourceId, e);
+            throw new StoreException("Unable to get the data source: " + 
resourceId);
+        }
+    }
+
+    private boolean validateQuerySql(String sql) {
+        if (sql == null || StringUtils.isBlank(sql)) {
+            return false;
+        }
+        return SELECT_PATTERN.matcher(sql).matches();
+    }
+
+    public List<Map<String, Object>> query(String resourceId, String sql, 
Object... params) {
+        Connection conn = null;
+        PreparedStatement ps = null;
+        ResultSet rs = null;
+
+        try {
+            if (!validateQuerySql(sql)) {
+                throw new StoreException("The query valid failed,Only query 
operations are allowed:" + sql);
+            }
+            conn = getConnection(resourceId);
+            if (params == null || params.length == 0) {
+                if ((sql.contains("where") || sql.contains("WHERE"))) {
+                    throw new StoreException(
+                            "Query contains WHERE clause but no parameters 
were provided. This may lead to unintended full table scans and is not 
allowed.");

Review Comment:
   The validation logic is inconsistent between regular queries (lines 77-80) 
and undo log queries (line 131). Regular queries throw an exception for WHERE 
clauses without parameters, while undo log queries silently remove the WHERE 
clause. This inconsistency could lead to confusion and security issues.



##########
namingserver/src/main/resources/application.yml:
##########
@@ -28,10 +36,67 @@ logging:
 heartbeat:
   threshold: 90000
   period: 60000
+console:
+  user:
+    password: seata
+    username: seata
 seata:
   security:
     secretKey: SeataSecretKey0c382ef121d778043159209298fd40bf3850a017
     tokenValidityInMilliseconds: 1800000
     csrf-ignore-urls: /naming/v1/**,/api/v1/naming/**
     ignore:
       urls: 
/,/**/*.css,/**/*.js,/**/*.html,/**/*.map,/**/*.svg,/**/*.png,/**/*.jpeg,/**/*.ico,/api/v1/auth/login,/version.json,/naming/v1/health,/error
+
+  # MCP Server Configuration
+  mcp:
+    # Server name (do not set the default to seata-mcp-server)
+    serverName: seata-mcp-server
+    serverVersion: 2.0.0
+    # MCP service transmission types, available in sse and streamable
+    mcpType: streamable
+
+    # sse type properties
+#    sse:
+#      # SSE endpoint (no default/SSE set)
+#      sseEndpoint: /sse
+#      # message endpoint (no default /message set)
+#      messageEndpoint: /message
+
+    # streamable type properties
+    streamable:
+      mcpEndpoint: /mcp
+      heartBeatSecondDuration: 30
+
+    # Query limits the maximum time interval, The unit is milliseconds, 
Default seven days
+    query:
+      max_query_duration: 604800000
+    # Whether to enable permission authentication
+    auth:
+      enabled: true
+
+  # Business data source configuration
+  businessDataSources:
+    dataSource1:
+      # Whether this data source is enabled
+      enabled: true
+      dbType: mysql
+      driverClassName: com.mysql.cj.jdbc.Driver # Currently, only mysql is 
supported
+      url: jdbc:mysql://:3306/order1?useSSL=false
+      username: root
+      password: root
+      datasource: druid # Optional connection pool type (druid/hikari/dbcp)
+      minConn: 5
+      maxConn: 50
+      maxWait: 5000
+    dataSource2:
+      enabled: true
+      dbType: mysql
+      driverClassName: com.mysql.cj.jdbc.Driver
+      url: jdbc:mysql://:3306/order2?useSSL=false

Review Comment:
   Database URLs are missing hostname and have useSSL=false which disables SSL 
encryption. This exposes database connections to potential security risks. 
Consider using proper hostnames and enabling SSL.



##########
console/src/main/java/org/apache/seata/mcp/service/impl/BusinessDataSourceServiceImpl.java:
##########
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seata.mcp.service.impl;
+
+import org.apache.seata.common.exception.StoreException;
+import org.apache.seata.common.result.PageResult;
+import org.apache.seata.common.util.StringUtils;
+import org.apache.seata.mcp.entity.constant.SqlConstant;
+import org.apache.seata.mcp.entity.param.UndoLogParam;
+import org.apache.seata.mcp.entity.pojo.MCPProperties;
+import org.apache.seata.mcp.entity.vo.UndoLogVO;
+import org.apache.seata.mcp.service.BusinessDataSourceService;
+import org.apache.seata.mcp.store.SqlExecutionTemplate;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+@Service
+public class BusinessDataSourceServiceImpl implements 
BusinessDataSourceService {
+
+    @Autowired
+    private SqlExecutionTemplate sqlExecutionTemplate;
+
+    @Autowired
+    private MCPProperties mcpProperties;
+
+    @Override
+    public List<String> getTableNamesBySchema(String resourceId) {
+        String schema = getSchemaNameByResourceId(resourceId);
+        if (StringUtils.isBlank(schema)) {
+            throw new StoreException("failed to get schema by resourceId: " + 
resourceId);
+        } else {
+            List<Map<String, Object>> maps =
+                    sqlExecutionTemplate.query(resourceId, 
SqlConstant.GET_TABLE_NAME_SQL, schema);
+            return maps.stream()
+                    .map(map -> {
+                        String tableName = 
String.valueOf(map.get("TABLE_NAME"));
+                        String tableComment = 
String.valueOf(map.get("TABLE_COMMENT"));
+                        return tableName + " (" + tableComment + ")";
+                    })
+                    .collect(Collectors.toList());
+        }
+    }
+
+    @Override
+    public List<Map<String, Object>> getTableSchemaByTableName(String 
resourceId, String tableName) {
+        String schema = getSchemaNameByResourceId(resourceId);
+        if (StringUtils.isBlank(schema)) {
+            throw new StoreException("failed to get schema by resourceId: " + 
resourceId);
+        } else {
+            return sqlExecutionTemplate.query(resourceId, 
SqlConstant.GET_SCHEMA_SQL, schema, tableName);
+        }
+    }
+
+    @Override
+    public List<Map<String, Object>> runSql(String sql, String resourceId) {
+        if (sql.contains("undo_log")) {
+            throw new StoreException(
+                    "If you do not use SQL to query undo_log data, use 
analyzeUndoLog to query and analyze undo_log");
+        }
+        return sqlExecutionTemplate.query(resourceId, sql);
+    }
+
+    @Override
+    public PageResult<UndoLogVO> getUndoLogInfo(UndoLogParam param) {
+        long max_time_duration = mcpProperties.getQueryDuration();
+        DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd 
HH:mm:ss");
+        String sql = SqlConstant.GET_UNDO_LOG_SQL;
+        List<Object> params = new ArrayList<>();
+        String branchId = param.getBranchId();
+        String xid = param.getXid();
+        String resourceId = param.getResourceId();
+        Integer logStatus = param.getLogStatus();
+        UndoLogParam.CreateTime logCreateTime = param.getLogCreateTime();
+        UndoLogParam.ModifyTime logModifiedTime = param.getLogModifiedTime();
+        int pageNum = param.getPageNum();
+        int pageSize = param.getPageSize();
+        int offset = getOffsetAndValidationPageQuerySql(pageNum, pageSize);
+        if (StringUtils.isBlank(resourceId)) {
+            throw new StoreException("you cannot query without resourceId");
+        }
+        int paramCounts = 0;
+        if (StringUtils.isNotBlank(branchId)) {
+            sql += SqlConstant.PARAM_BRANCH_ID_SQL;
+            params.add(branchId);
+            paramCounts++;
+        }
+        if (StringUtils.isNotBlank(xid)) {
+            sql += SqlConstant.PARAM_XID_SQL;
+            params.add(xid);
+            paramCounts++;
+        }
+        if (logStatus != null) {
+            sql += SqlConstant.UNDO_LOG_STATUS_SQL;
+            params.add(logStatus);
+            paramCounts++;
+        }
+        boolean containsTimeDuration = false;
+        if (logCreateTime != null) {
+            String startTime = logCreateTime.getStartTime();
+            String endTime = logCreateTime.getEndTime();
+            if (startTime != null && endTime != null) {
+                sql += SqlConstant.UNDO_LOG_CREATE_TIME_SQL;
+                containsTimeDuration = true;
+                Long startTimestamp = LocalDateTime.parse(startTime, formatter)
+                        .atZone(ZoneId.systemDefault())
+                        .toInstant()
+                        .toEpochMilli();
+                Long endTimestamp = LocalDateTime.parse(endTime, formatter)
+                        .atZone(ZoneId.systemDefault())
+                        .toInstant()
+                        .toEpochMilli();
+                if (endTimestamp - startTimestamp > max_time_duration) {
+                    throw new StoreException(
+                            "The query time span is not allowed to exceed the 
max query duration(milliseconds): "
+                                    + max_time_duration);
+                }
+            }
+            if (startTime != null) {
+                params.add(startTime);
+            }
+            if (endTime != null) {
+                params.add(endTime);
+            }
+        }
+        if (logModifiedTime != null) {
+            String startTime = logModifiedTime.getStartTime();
+            String endTime = logModifiedTime.getEndTime();
+            if (startTime != null && endTime != null) {
+                if (containsTimeDuration) {
+                    sql += " AND" + SqlConstant.UNDO_LOG_MODIFY_TIME_SQL;
+                } else {
+                    sql += SqlConstant.UNDO_LOG_MODIFY_TIME_SQL;
+                    containsTimeDuration = true;
+                }
+                Long startTimestamp = LocalDateTime.parse(startTime, formatter)
+                        .atZone(ZoneId.systemDefault())
+                        .toInstant()
+                        .toEpochMilli();
+                Long endTimestamp = LocalDateTime.parse(endTime, formatter)
+                        .atZone(ZoneId.systemDefault())
+                        .toInstant()
+                        .toEpochMilli();
+                if (endTimestamp - startTimestamp > max_time_duration) {
+                    throw new StoreException(
+                            "The query time span is not allowed to exceed the 
max query duration(milliseconds): "
+                                    + max_time_duration);
+                }
+            }
+            if (startTime != null) {
+                params.add(startTime);
+            }
+            if (endTime != null) {
+                params.add(endTime);
+            }
+        }
+        if (containsTimeDuration) {
+            for (int i = 0; i < paramCounts; i++) {
+                sql = sql.replaceFirst("#", "AND");
+            }

Review Comment:
   Using placeholder '#' replacement for SQL construction is fragile and 
error-prone. Consider using a more robust SQL builder pattern or prepared 
statement approach with proper parameter binding.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to