This is an automated email from the ASF dual-hosted git repository.
corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new b1a4be9149 [Fix] logs http basic issue # 9755 (#9968)
b1a4be9149 is described below
commit b1a4be9149c6d35252fa4e986e3c2d3cf69e26a2
Author: dotfive-star <[email protected]>
AuthorDate: Sat Nov 8 21:32:28 2025 +0800
[Fix] logs http basic issue # 9755 (#9968)
---
.../engine/server/rest/service/BaseLogService.java | 82 ++++++--
.../engine/server/rest/service/LogService.java | 20 +-
.../engine/server/rest/RestApiHttpBasicTest.java | 217 +++++++++++++++++++++
3 files changed, 302 insertions(+), 17 deletions(-)
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/BaseLogService.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/BaseLogService.java
index a1e84f9ec7..ce95965250 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/BaseLogService.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/BaseLogService.java
@@ -29,13 +29,19 @@ import java.io.IOException;
import java.io.InputStream;
import java.net.HttpURLConnection;
import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import java.util.Base64;
@Slf4j
public class BaseLogService extends BaseService {
+
public BaseLogService(NodeEngineImpl nodeEngine) {
super(nodeEngine);
}
+ private static final String AUTHORIZATION_HEADER = "Authorization";
+ private static final String BASIC_PREFIX = "Basic ";
+
/** Get configuration log path */
public String getLogPath() {
try {
@@ -46,31 +52,83 @@ public class BaseLogService extends BaseService {
}
}
+ /**
+ * Send a simple HTTP GET request.
+ *
+ * @param urlString url
+ * @return the response body as a string, or {@code null} if the request
failed
+ */
protected String sendGet(String urlString) {
+ return sendGet(urlString, null, null);
+ }
+
+ /**
+ * Send GET request (optionally with Basic Auth)
+ *
+ * @param urlString url
+ * @param user username, nullable
+ * @param pass password, nullable
+ * @return the response body as a string, or {@code null} if the request
failed
+ */
+ protected String sendGet(String urlString, String user, String pass) {
+ HttpURLConnection connection = null;
try {
- HttpURLConnection connection = (HttpURLConnection) new
URL(urlString).openConnection();
+ connection = (HttpURLConnection) new
URL(urlString).openConnection();
connection.setRequestMethod("GET");
connection.setConnectTimeout(5000);
connection.setReadTimeout(5000);
+
+ // Basic Auth
+ if (user != null && pass != null) {
+ String auth = user + ":" + pass;
+ String token =
+
Base64.getEncoder().encodeToString(auth.getBytes(StandardCharsets.UTF_8));
+ connection.setRequestProperty(AUTHORIZATION_HEADER,
BASIC_PREFIX + token);
+ }
+
connection.connect();
- if (connection.getResponseCode() == 200) {
- try (InputStream is = connection.getInputStream();
- ByteArrayOutputStream baos = new
ByteArrayOutputStream()) {
- byte[] buffer = new byte[1024];
- int len;
- while ((len = is.read(buffer)) != -1) {
- baos.write(buffer, 0, len);
- }
- return baos.toString();
- }
+ int code = connection.getResponseCode();
+ if (code == HttpURLConnection.HTTP_OK) {
+ return readResponseBody(connection.getInputStream());
+ } else {
+ log.warn("GET {} -> HTTP {}", urlString, code);
+ drainErrorStream(connection);
}
} catch (IOException e) {
- log.error("Send get Fail.{}", ExceptionUtils.getMessage(e));
+ log.error("Send GET failed: url={}, err={}", urlString,
ExceptionUtils.getMessage(e));
+ } finally {
+ if (connection != null) {
+ connection.disconnect();
+ }
}
return null;
}
+ private String readResponseBody(InputStream is) throws IOException {
+ try (InputStream input = is;
+ ByteArrayOutputStream output = new ByteArrayOutputStream()) {
+
+ byte[] buf = new byte[4096];
+ int len;
+ while ((len = input.read(buf)) != -1) {
+ output.write(buf, 0, len);
+ }
+ return output.toString(StandardCharsets.UTF_8.name());
+ }
+ }
+
+ private void drainErrorStream(HttpURLConnection connection) throws
IOException {
+ try (InputStream err = connection.getErrorStream()) {
+ if (err != null) {
+ byte[] buffer = new byte[1024];
+ while (err.read(buffer) != -1) {
+ // discard
+ }
+ }
+ }
+ }
+
public String getLogParam(String uri, String contextPath) {
uri = uri.substring(uri.indexOf(contextPath) + contextPath.length());
uri = StringUtil.stripTrailingSlash(uri).substring(1);
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/LogService.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/LogService.java
index d112ae069b..013b08fdec 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/LogService.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/LogService.java
@@ -70,15 +70,25 @@ public class LogService extends BaseLogService {
systemMonitoringInformation -> {
String host =
systemMonitoringInformation.asObject().get("host").asString();
String url = "http://" + host + ":" + port + contextPath;
- String allName = sendGet(url + REST_URL_GET_ALL_LOG_NAME);
+ String logUrl = url + REST_URL_GET_ALL_LOG_NAME;
+
+ String allName =
+ httpConfig.isEnableBasicAuth()
+ ? sendGet(
+ logUrl,
+ httpConfig.getBasicAuthUsername(),
+ httpConfig.getBasicAuthPassword())
+ : sendGet(logUrl);
+
if (StringUtils.isBlank(allName)) {
log.warn(
- "Get log file name failed: response logName is
blank. url: {}, response: {}",
- url + REST_URL_GET_ALL_LOG_NAME,
- allName);
+ "GET {} returned empty body (null/empty). Skip
this node.", logUrl);
return;
}
- log.debug("Request: {} , Result: {}", url, allName);
+
+ if (log.isDebugEnabled()) {
+ log.debug("Request: {} , Result: {}", url, allName);
+ }
ArrayNode jsonNodes = JsonUtils.parseArray(allName);
jsonNodes.forEach(
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/rest/RestApiHttpBasicTest.java
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/rest/RestApiHttpBasicTest.java
new file mode 100644
index 0000000000..0117325913
--- /dev/null
+++
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/rest/RestApiHttpBasicTest.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.rest;
+
+import org.apache.seatunnel.shade.org.apache.commons.lang3.StringUtils;
+
+import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
+import org.apache.seatunnel.engine.common.config.server.HttpConfig;
+import org.apache.seatunnel.engine.common.runtime.ExecutionMode;
+import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
+import org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
+import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
+import org.apache.seatunnel.engine.server.AbstractSeaTunnelServerTest;
+import org.apache.seatunnel.engine.server.SeaTunnelServer;
+import org.apache.seatunnel.engine.server.SeaTunnelServerStarter;
+import org.apache.seatunnel.engine.server.TestUtils;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import com.hazelcast.config.Config;
+import com.hazelcast.internal.serialization.Data;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import java.util.Base64;
+import java.util.Base64.Encoder;
+import java.util.Collections;
+import java.util.stream.Collectors;
+
+import static
org.apache.seatunnel.engine.server.rest.RestConstant.REST_URL_LOGS;
+import static
org.apache.seatunnel.engine.server.rest.RestConstant.REST_URL_OVERVIEW;
+
+/** Test for Rest API with Basic. */
+class RestApiHttpBasicTest extends AbstractSeaTunnelServerTest {
+
+ private static final int HTTP_PORT = 18081;
+ private static final Long JOB_1 = System.currentTimeMillis() + 1L;
+ private static final String USER = "admin";
+ private static final String PASS = "admin";
+ private static final String DOMAIN = "http://localhost:" + HTTP_PORT;
+
+ private static final String AUTHORIZATION_HEADER = "Authorization";
+ private static final String BASIC_PREFIX = "Basic ";
+
+ @BeforeAll
+ void setUp() {
+ String name = this.getClass().getName();
+ Config hazelcastConfig = Config.loadFromString(getHazelcastConfig());
+ hazelcastConfig.setClusterName(
+ TestUtils.getClusterName("RestApiServletHttpBasicTest_" +
name));
+ SeaTunnelConfig seaTunnelConfig = loadSeaTunnelConfig();
+ seaTunnelConfig.setHazelcastConfig(hazelcastConfig);
+ seaTunnelConfig.getEngineConfig().setMode(ExecutionMode.LOCAL);
+
+ HttpConfig httpConfig =
seaTunnelConfig.getEngineConfig().getHttpConfig();
+ httpConfig.setEnabled(Boolean.TRUE);
+ httpConfig.setPort(HTTP_PORT);
+
+ httpConfig.setEnableBasicAuth(Boolean.TRUE);
+ httpConfig.setBasicAuthUsername(USER);
+ httpConfig.setBasicAuthPassword(PASS);
+
+ instance =
SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
+ nodeEngine = instance.node.nodeEngine;
+ server = nodeEngine.getService(SeaTunnelServer.SERVICE_NAME);
+ LOGGER = nodeEngine.getLogger(AbstractSeaTunnelServerTest.class);
+ }
+
+ @AfterAll
+ public void after() {
+ // Disable basic auth
+ // Because of the ConfigProvider.locateAndGetSeaTunnelConfig()
single-case,
+ // if you change, other use cases will also change
+ // managed via
org.apache.seatunnel.engine.common.config.YamlSeaTunnelDomConfigProcessor
+ SeaTunnelConfig seaTunnelConfig = loadSeaTunnelConfig();
+ HttpConfig httpConfig =
seaTunnelConfig.getEngineConfig().getHttpConfig();
+ httpConfig.setEnableBasicAuth(Boolean.FALSE);
+ httpConfig.setBasicAuthUsername("");
+ httpConfig.setBasicAuthPassword("");
+ }
+
+ @Test
+ public void testRestApiOverview() throws Exception {
+ HttpURLConnection conn = null;
+ try {
+ URL url = new URL(DOMAIN + REST_URL_OVERVIEW);
+ conn = (HttpURLConnection) url.openConnection();
+ setBasicAuth(conn);
+
+ Assertions.assertEquals(200, conn.getResponseCode());
+ Assertions.assertTrue(
+ conn.getHeaderFields()
+ .get("Content-Type")
+ .toString()
+ .contains("charset=utf-8"));
+ } finally {
+ if (conn != null) {
+ conn.disconnect();
+ }
+ }
+ }
+
+ @Test
+ void testLogRestApiResponseFailure() throws IOException {
+ startJob();
+ HttpURLConnection conn = null;
+ try {
+ URL url = new URL(DOMAIN + REST_URL_LOGS + "?format=JSON");
+ conn = (HttpURLConnection) url.openConnection();
+
+ Assertions.assertEquals(401, conn.getResponseCode());
+ } finally {
+ if (conn != null) {
+ conn.disconnect();
+ }
+ }
+ }
+
+ @Test
+ void testLogRestApiResponseSuccess() throws IOException {
+ startJob();
+ testLogRestApiResponse("JSON");
+ }
+
+ public void setBasicAuth(HttpURLConnection connection) {
+ // Basic Auth
+ Encoder encoder = Base64.getEncoder();
+ String auth = USER + ":" + PASS;
+ String token =
encoder.encodeToString(auth.getBytes(StandardCharsets.UTF_8));
+ connection.setRequestProperty(AUTHORIZATION_HEADER, BASIC_PREFIX +
token);
+ }
+
+ public void testLogRestApiResponse(String format) throws IOException {
+ HttpURLConnection conn = null;
+ try {
+ URL url = new URL(DOMAIN + REST_URL_LOGS + "?format=" + format);
+ conn = (HttpURLConnection) url.openConnection();
+ setBasicAuth(conn);
+
+ Assertions.assertEquals(200, conn.getResponseCode());
+ Assertions.assertTrue(
+ conn.getHeaderFields()
+ .get("Content-Type")
+ .toString()
+ .contains("charset=utf-8"));
+
+ try (BufferedReader in =
+ new BufferedReader(new
InputStreamReader(conn.getInputStream()))) {
+ // [ {
+ // "node" : "localhost:18080",
+ // "logLink" :
"http://localhost:18080/logs/job-1760939539658.log",
+ // "logName" : "job-1760939539658.log"
+ // }, {
+ // "node" : "localhost:18080",
+ // "logLink" :
"http://localhost:18080/logs/job-${ctx:ST-JID}.log",
+ // "logName" : "job-${ctx:ST-JID}.log"
+ // } ]
+ String response = in.lines().collect(Collectors.joining());
+ Assertions.assertFalse(StringUtils.isBlank(response));
+ }
+
+ } finally {
+ if (conn != null) {
+ conn.disconnect();
+ }
+ }
+ }
+
+ private void startJob() {
+ LogicalDag testLogicalDag =
+ TestUtils.createTestLogicalPlan(
+ "fake_to_console.conf",
+ RestApiHttpBasicTest.JOB_1.toString(),
+ RestApiHttpBasicTest.JOB_1);
+
+ JobImmutableInformation jobImmutableInformation =
+ new JobImmutableInformation(
+ RestApiHttpBasicTest.JOB_1,
+ "Test",
+ nodeEngine.getSerializationService(),
+ testLogicalDag,
+ Collections.emptyList(),
+ Collections.emptyList());
+
+ Data data =
nodeEngine.getSerializationService().toData(jobImmutableInformation);
+
+ PassiveCompletableFuture<Void> voidPassiveCompletableFuture =
+ server.getCoordinatorService()
+ .submitJob(
+ RestApiHttpBasicTest.JOB_1,
+ data,
+
jobImmutableInformation.isStartWithSavePoint());
+ voidPassiveCompletableFuture.join();
+ }
+}