github-actions[bot] commented on code in PR #61626:
URL: https://github.com/apache/doris/pull/61626#discussion_r2973590744
##########
sdk/java-doris-sdk/src/main/java/org/apache/doris/sdk/load/internal/StreamLoader.java:
##########
@@ -0,0 +1,135 @@
+package org.apache.doris.sdk.load.internal;
+
+import org.apache.doris.sdk.load.exception.StreamLoadException;
+import org.apache.doris.sdk.load.model.LoadResponse;
+import org.apache.doris.sdk.load.model.RespContent;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.conn.ssl.NoopHostnameVerifier;
+import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.DefaultRedirectStrategy;
+import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.http.ssl.SSLContextBuilder;
+import org.apache.http.util.EntityUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+
+/**
+ * Executes a single HTTP PUT request and parses the Doris response.
+ * Thread-safe: CloseableHttpClient is shared and reused across calls.
+ *
+ * Uses LaxRedirectStrategy so that 307 redirects on PUT requests are
+ * followed automatically (Doris FE redirects stream load to BE).
+ */
+public class StreamLoader implements AutoCloseable {
+
+ private static final Logger log =
LoggerFactory.getLogger(StreamLoader.class);
+ private static final int SOCKET_TIMEOUT_MS = 9 * 60 * 1000;
+ private static final int CONNECT_TIMEOUT_MS = 60_000;
+
+ private final HttpClientBuilder httpClientBuilder;
+ private final ObjectMapper objectMapper;
+
+ public StreamLoader() {
+ this.httpClientBuilder = buildHttpClient();
+ this.objectMapper = new ObjectMapper();
+ }
+
+ /** Package-private constructor for testing with a mock HTTP client. */
+ StreamLoader(HttpClientBuilder httpClientBuilder) {
+ this.httpClientBuilder = httpClientBuilder;
+ this.objectMapper = new ObjectMapper();
+ }
+
+ /**
+ * Executes the HTTP PUT request and returns a LoadResponse.
+ *
+ * @throws StreamLoadException for retryable HTTP-level errors (non-200
status, connection failure)
+ * @throws IOException for unrecoverable I/O errors
+ */
+ public LoadResponse execute(HttpPut request) throws IOException {
+ log.debug("Sending HTTP PUT to {}", request.getURI());
+ long start = System.currentTimeMillis();
+
+ try (CloseableHttpClient httpClient = httpClientBuilder.build();
+ CloseableHttpResponse response = httpClient.execute(request)) {
+ int statusCode = response.getStatusLine().getStatusCode();
+ log.debug("HTTP response status: {}", statusCode);
+ log.debug("HTTP request completed in {} ms",
System.currentTimeMillis() - start);
+
+ if (statusCode == 200) {
+ return parseResponse(response);
+ } else {
+ // Non-200 is retryable (e.g. 503, 429)
+ throw new StreamLoadException("stream load error: " +
response.getStatusLine().toString());
+ }
+ } catch (StreamLoadException e) {
+ throw e;
+ } catch (IOException e) {
+ throw new StreamLoadException("stream load request failed: " +
e.getMessage(), e);
+ }
+ }
+
+ private LoadResponse parseResponse(CloseableHttpResponse response) throws
IOException {
+ byte[] bodyBytes = EntityUtils.toByteArray(response.getEntity());
+ String body = new String(bodyBytes, StandardCharsets.UTF_8);
+ log.info("Stream Load Response: {}", body);
+
+ RespContent resp = objectMapper.readValue(body, RespContent.class);
+
+ if (isSuccess(resp.getStatus())) {
+ log.info("Load operation completed successfully");
+ return LoadResponse.success(resp);
+ } else {
+ log.error("Load operation failed with status: {}",
resp.getStatus());
+ String errorMsg;
+ if (resp.getMessage() != null && !resp.getMessage().isEmpty()) {
+ errorMsg = "load failed. cause by: " + resp.getMessage()
+ + ", please check more detail from url: " +
resp.getErrorUrl();
+ } else {
+ errorMsg = body;
+ }
+ return LoadResponse.failure(resp, errorMsg);
+ }
+ }
+
+ private static boolean isSuccess(String status) {
+ return "success".equalsIgnoreCase(status);
+ }
+
+ private static HttpClientBuilder buildHttpClient() {
+ try {
+ RequestConfig requestConfig = RequestConfig.custom()
+ .setConnectTimeout(CONNECT_TIMEOUT_MS)
+ .setSocketTimeout(SOCKET_TIMEOUT_MS)
+ .setConnectionRequestTimeout(CONNECT_TIMEOUT_MS)
+ .build();
+
+ return HttpClientBuilder.create()
+ .setDefaultRequestConfig(requestConfig)
+ .setRedirectStrategy(new DefaultRedirectStrategy() {
+ @Override
+ protected boolean isRedirectable(String method) {
+ return true;
+ }
+ })
+ .setSSLSocketFactory(
+ new SSLConnectionSocketFactory(
+ SSLContextBuilder.create()
+ .loadTrustMaterial(null, (chain,
authType) -> true)
+ .build(),
+ NoopHostnameVerifier.INSTANCE));
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to build HTTP client", e);
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ }
Review Comment:
**[Bug]** `close()` is a no-op, but `DorisLoadClient.close()` delegates to
`streamLoader.close()`. Once the HTTP client is stored as a field (per the
comment above), this method must close it. As-is, if anyone uses the
`try-with-resources` pattern shown in the examples and README, no resources are
actually released.
##########
sdk/java-doris-sdk/src/main/java/org/apache/doris/sdk/load/DorisLoadClient.java:
##########
@@ -0,0 +1,206 @@
+package org.apache.doris.sdk.load;
+
+import org.apache.doris.sdk.load.config.DorisConfig;
+import org.apache.doris.sdk.load.exception.StreamLoadException;
+import org.apache.doris.sdk.load.internal.RequestBuilder;
+import org.apache.doris.sdk.load.internal.StreamLoader;
+import org.apache.doris.sdk.load.model.LoadResponse;
+import org.apache.http.client.methods.HttpPut;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.zip.GZIPOutputStream;
+
+/**
+ * Thread-safe Doris stream load client.
+ *
+ * <p>Usage:
+ * <pre>
+ * DorisLoadClient client = new DorisLoadClient(config);
+ * LoadResponse resp = client.load(new ByteArrayInputStream(data));
+ * if (resp.getStatus() == LoadResponse.Status.SUCCESS) { ... }
+ * </pre>
+ *
+ * <p>Thread safety: this instance can be shared across threads.
+ * Each {@link #load} call must receive an independent InputStream.
+ */
+public class DorisLoadClient implements AutoCloseable {
+
+ private static final Logger log =
LoggerFactory.getLogger(DorisLoadClient.class);
+ /** Absolute maximum for a single backoff interval: 5 minutes. */
+ private static final long ABSOLUTE_MAX_INTERVAL_MS = 300_000L;
+
+ private final DorisConfig config;
+ private final StreamLoader streamLoader;
+
+ public DorisLoadClient(DorisConfig config) {
+ this.config = config;
+ this.streamLoader = new StreamLoader();
+ }
+
+ /** Package-private constructor for testing with a mock StreamLoader. */
+ DorisLoadClient(DorisConfig config, StreamLoader streamLoader) {
+ this.config = config;
+ this.streamLoader = streamLoader;
+ }
+
+ /**
+ * Loads data from the given InputStream into Doris via stream load.
+ * The InputStream is fully consumed and buffered before the first attempt.
+ * Retries with exponential backoff on retryable errors (network/HTTP
failures).
+ * Business failures (bad data, schema mismatch, auth) are returned
immediately without retry.
+ *
+ * @param inputStream data to load (consumed once; must not be shared
across threads)
+ * @return LoadResponse with status SUCCESS or FAILURE
+ * @throws IOException if the stream cannot be read or all retries are
exhausted
+ */
+ public LoadResponse load(InputStream inputStream) throws IOException {
+ int maxRetries = 6;
+ long baseIntervalMs = 1000L;
+ long maxTotalTimeMs = 60000L;
+
+ if (config.getRetry() != null) {
+ maxRetries = config.getRetry().getMaxRetryTimes();
+ baseIntervalMs = config.getRetry().getBaseIntervalMs();
+ maxTotalTimeMs = config.getRetry().getMaxTotalTimeMs();
+ }
+
+ log.info("Starting stream load: {}.{}", config.getDatabase(),
config.getTable());
+
+ // Buffer the InputStream once so retries can replay the body
+ byte[] bodyData = readAll(inputStream);
+
+ // Compress once before the retry loop (avoids re-compressing on each
retry)
+ if (config.isEnableGzip()) {
+ bodyData = gzipCompress(bodyData);
+ }
+ Exception lastException = null;
+ LoadResponse lastResponse = null;
+ long totalRetryTimeMs = 0L;
+ long operationStart = System.currentTimeMillis();
+
+ for (int attempt = 0; attempt <= maxRetries; attempt++) {
+ if (attempt > 0) {
Review Comment:
**[Bug] Retry timeout enforcement is asymmetric and uses sleep-time instead
of wall-clock time.**
`totalRetryTimeMs` (line 82) only accumulates sleep/backoff time. Actual
HTTP request execution time is never accounted for. This means:
- If `maxTotalTimeMs = 60s` and each request takes 30s before throwing
`StreamLoadException`, 7 attempts could take ~273s total (210s of request time
+ 63s of sleep), but `totalRetryTimeMs` would only show ~63s.
Additionally, the `StreamLoadException` catch block (line 115) has no
wall-clock time check, while the generic `Exception` catch block (line 119)
does check `System.currentTimeMillis() - operationStart`. This asymmetry means
`StreamLoadException` retries can run indefinitely beyond the time limit.
**Fix**: Replace `totalRetryTimeMs` tracking with wall-clock elapsed time,
and apply the same timeout check uniformly in both catch blocks:
```java
long elapsed = System.currentTimeMillis() - operationStart;
if (maxTotalTimeMs > 0 && elapsed > maxTotalTimeMs) {
log.warn("Total elapsed time exceeded limit, stopping retries.");
break;
}
```
##########
sdk/java-doris-sdk/src/main/java/org/apache/doris/sdk/load/internal/StreamLoader.java:
##########
@@ -0,0 +1,135 @@
+package org.apache.doris.sdk.load.internal;
+
+import org.apache.doris.sdk.load.exception.StreamLoadException;
+import org.apache.doris.sdk.load.model.LoadResponse;
+import org.apache.doris.sdk.load.model.RespContent;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.conn.ssl.NoopHostnameVerifier;
+import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.DefaultRedirectStrategy;
+import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.http.ssl.SSLContextBuilder;
+import org.apache.http.util.EntityUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+
+/**
+ * Executes a single HTTP PUT request and parses the Doris response.
+ * Thread-safe: CloseableHttpClient is shared and reused across calls.
+ *
+ * Uses LaxRedirectStrategy so that 307 redirects on PUT requests are
+ * followed automatically (Doris FE redirects stream load to BE).
+ */
+public class StreamLoader implements AutoCloseable {
+
+ private static final Logger log =
LoggerFactory.getLogger(StreamLoader.class);
+ private static final int SOCKET_TIMEOUT_MS = 9 * 60 * 1000;
+ private static final int CONNECT_TIMEOUT_MS = 60_000;
+
+ private final HttpClientBuilder httpClientBuilder;
+ private final ObjectMapper objectMapper;
+
+ public StreamLoader() {
+ this.httpClientBuilder = buildHttpClient();
+ this.objectMapper = new ObjectMapper();
+ }
+
+ /** Package-private constructor for testing with a mock HTTP client. */
+ StreamLoader(HttpClientBuilder httpClientBuilder) {
+ this.httpClientBuilder = httpClientBuilder;
+ this.objectMapper = new ObjectMapper();
+ }
+
+ /**
+ * Executes the HTTP PUT request and returns a LoadResponse.
+ *
+ * @throws StreamLoadException for retryable HTTP-level errors (non-200
status, connection failure)
+ * @throws IOException for unrecoverable I/O errors
+ */
+ public LoadResponse execute(HttpPut request) throws IOException {
+ log.debug("Sending HTTP PUT to {}", request.getURI());
+ long start = System.currentTimeMillis();
+
+ try (CloseableHttpClient httpClient = httpClientBuilder.build();
+ CloseableHttpResponse response = httpClient.execute(request)) {
Review Comment:
**[Performance Bug]** A new `CloseableHttpClient` is created on every
`execute()` call via `httpClientBuilder.build()`. This is a significant
performance anti-pattern:
1. Each `build()` creates a new connection manager, SSL context, and
connection pool.
2. Connection reuse (HTTP keep-alive) across calls is impossible since the
client is immediately closed.
3. For a single `load()` with retries (up to 7 attempts), this creates and
destroys 7 separate HTTP clients.
The Javadoc at line 24 claims *"Thread-safe: CloseableHttpClient is shared
and reused across calls"* — this is incorrect; neither sharing nor reuse
happens.
**Fix**: Build the `CloseableHttpClient` once in the constructor, store it
as a field, reuse it in `execute()`, and close it in `close()`:
```java
private final CloseableHttpClient httpClient;
public StreamLoader() {
this.httpClient = buildHttpClient().build();
this.objectMapper = new ObjectMapper();
}
public LoadResponse execute(HttpPut request) throws IOException {
try (CloseableHttpResponse response = httpClient.execute(request)) {
// ...
}
}
@Override
public void close() throws IOException {
httpClient.close();
}
```
##########
sdk/java-doris-sdk/src/main/java/org/apache/doris/sdk/load/DorisLoadClient.java:
##########
@@ -0,0 +1,206 @@
+package org.apache.doris.sdk.load;
+
+import org.apache.doris.sdk.load.config.DorisConfig;
+import org.apache.doris.sdk.load.exception.StreamLoadException;
+import org.apache.doris.sdk.load.internal.RequestBuilder;
+import org.apache.doris.sdk.load.internal.StreamLoader;
+import org.apache.doris.sdk.load.model.LoadResponse;
+import org.apache.http.client.methods.HttpPut;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.zip.GZIPOutputStream;
+
+/**
+ * Thread-safe Doris stream load client.
+ *
+ * <p>Usage:
+ * <pre>
+ * DorisLoadClient client = new DorisLoadClient(config);
+ * LoadResponse resp = client.load(new ByteArrayInputStream(data));
+ * if (resp.getStatus() == LoadResponse.Status.SUCCESS) { ... }
+ * </pre>
+ *
+ * <p>Thread safety: this instance can be shared across threads.
+ * Each {@link #load} call must receive an independent InputStream.
+ */
+public class DorisLoadClient implements AutoCloseable {
+
+ private static final Logger log =
LoggerFactory.getLogger(DorisLoadClient.class);
+ /** Absolute maximum for a single backoff interval: 5 minutes. */
+ private static final long ABSOLUTE_MAX_INTERVAL_MS = 300_000L;
+
+ private final DorisConfig config;
+ private final StreamLoader streamLoader;
+
+ public DorisLoadClient(DorisConfig config) {
+ this.config = config;
+ this.streamLoader = new StreamLoader();
+ }
+
+ /** Package-private constructor for testing with a mock StreamLoader. */
+ DorisLoadClient(DorisConfig config, StreamLoader streamLoader) {
+ this.config = config;
+ this.streamLoader = streamLoader;
+ }
+
+ /**
+ * Loads data from the given InputStream into Doris via stream load.
+ * The InputStream is fully consumed and buffered before the first attempt.
+ * Retries with exponential backoff on retryable errors (network/HTTP
failures).
+ * Business failures (bad data, schema mismatch, auth) are returned
immediately without retry.
+ *
+ * @param inputStream data to load (consumed once; must not be shared
across threads)
+ * @return LoadResponse with status SUCCESS or FAILURE
+ * @throws IOException if the stream cannot be read or all retries are
exhausted
+ */
+ public LoadResponse load(InputStream inputStream) throws IOException {
+ int maxRetries = 6;
+ long baseIntervalMs = 1000L;
+ long maxTotalTimeMs = 60000L;
+
+ if (config.getRetry() != null) {
+ maxRetries = config.getRetry().getMaxRetryTimes();
+ baseIntervalMs = config.getRetry().getBaseIntervalMs();
+ maxTotalTimeMs = config.getRetry().getMaxTotalTimeMs();
+ }
+
+ log.info("Starting stream load: {}.{}", config.getDatabase(),
config.getTable());
+
+ // Buffer the InputStream once so retries can replay the body
+ byte[] bodyData = readAll(inputStream);
+
+ // Compress once before the retry loop (avoids re-compressing on each
retry)
+ if (config.isEnableGzip()) {
+ bodyData = gzipCompress(bodyData);
+ }
+ Exception lastException = null;
+ LoadResponse lastResponse = null;
+ long totalRetryTimeMs = 0L;
+ long operationStart = System.currentTimeMillis();
+
+ for (int attempt = 0; attempt <= maxRetries; attempt++) {
+ if (attempt > 0) {
+ log.info("Retry attempt {}/{}", attempt, maxRetries);
+ long backoff = calculateBackoffMs(attempt, baseIntervalMs,
maxTotalTimeMs, totalRetryTimeMs);
+
+ if (maxTotalTimeMs > 0 && totalRetryTimeMs + backoff >
maxTotalTimeMs) {
+ log.warn("Next retry backoff ({}ms) would exceed total
limit ({}ms). Stopping.", backoff, maxTotalTimeMs);
+ break;
+ }
+
+ log.info("Waiting {}ms before retry (total retry time so far:
{}ms)", backoff, totalRetryTimeMs);
+ sleep(backoff);
+ totalRetryTimeMs += backoff;
+ } else {
+ log.info("Initial load attempt");
+ }
+
+ try {
+ HttpPut request = RequestBuilder.build(config, bodyData,
attempt);
+ lastResponse = streamLoader.execute(request);
+
+ if (lastResponse.getStatus() == LoadResponse.Status.SUCCESS) {
+ log.info("Stream load succeeded on attempt {}", attempt +
1);
+ return lastResponse;
+ }
+
+ // Business failure (bad data, schema mismatch, auth) — do not
retry
+ log.error("Load failed (non-retryable): {}",
lastResponse.getErrorMessage());
+ return lastResponse;
+
+ } catch (StreamLoadException e) {
+ // Retryable: network error, HTTP 5xx, etc.
+ lastException = e;
+ log.error("Attempt {} failed with retryable error: ", attempt
+ 1, e);
+ } catch (Exception e) {
+ // Wrap unexpected exceptions as retryable
Review Comment:
**[Bug] Asymmetric error handling between `StreamLoadException` and generic
`Exception`.**
The `StreamLoadException` catch (line 115) simply records the exception and
lets the loop continue to the next iteration's backoff check. But this generic
`Exception` catch has two additional guards:
1. `attempt == maxRetries` check (line 124) — breaks immediately on last
retry
2. Wall-clock elapsed time check (lines 130-133)
The `StreamLoadException` path lacks both of these. Since
`StreamLoadException` is the expected retryable error (connection failures,
HTTP 5xx), it's the more common retry path and should have the same safeguards.
Also, wrapping generic `Exception` as retryable at line 121 is questionable
— `NullPointerException`, `IllegalArgumentException`, etc. are programming
errors that should not be retried.
##########
sdk/java-doris-sdk/src/main/java/org/apache/doris/sdk/load/internal/RequestBuilder.java:
##########
@@ -0,0 +1,144 @@
+package org.apache.doris.sdk.load.internal;
+
+import org.apache.doris.sdk.load.config.DorisConfig;
+import org.apache.doris.sdk.load.config.GroupCommitMode;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.entity.ByteArrayEntity;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Base64;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.UUID;
+
+/**
+ * Builds HttpPut requests for Doris stream load.
+ * Handles header assembly, label generation, and group commit logic.
+ */
+public class RequestBuilder {
+
+ private static final Logger log =
LoggerFactory.getLogger(RequestBuilder.class);
+ private static final String STREAM_LOAD_PATTERN =
"http://%s/api/%s/%s/_stream_load";
+ private static final Random RANDOM = new Random();
Review Comment:
**[Bug] HTTPS endpoints silently downgraded to HTTP.**
`STREAM_LOAD_PATTERN` hardcodes `http://`, while `pickEndpoint()` (line
138-142) strips both `http://` and `https://` prefixes. If a user configures
`https://fe1:8030`, the scheme is stripped and then reconstructed as
`http://fe1:8030/api/...`, silently downgrading to unencrypted HTTP.
**Fix**: Preserve the original scheme from the endpoint, or reconstruct the
URL using the endpoint directly:
```java
private static final String STREAM_LOAD_PATTERN =
"%s/api/%s/%s/_stream_load";
// In build():
String endpoint = pickEndpoint(config.getEndpoints()); // keep scheme
String url = String.format(STREAM_LOAD_PATTERN, endpoint,
config.getDatabase(), config.getTable());
```
And remove the scheme-stripping from `pickEndpoint()`.
##########
sdk/java-doris-sdk/src/main/java/org/apache/doris/sdk/DorisClient.java:
##########
@@ -0,0 +1,65 @@
+package org.apache.doris.sdk;
+
Review Comment:
**[Compliance] Missing Apache Software Foundation license header.** This
file and 12 other core source files + 5 test files are missing the ASF license
header that is required for Apache projects. The example files have it
correctly. All `.java` files should have the license header.
Missing from:
- `DorisClient.java`, `DorisLoadClient.java`, `StreamLoader.java`,
`RequestBuilder.java`
- `DorisConfig.java`, `CsvFormat.java`, `JsonFormat.java`, `Format.java`,
`GroupCommitMode.java`, `RetryConfig.java`
- `StreamLoadException.java`, `LoadResponse.java`, `RespContent.java`
- All 5 test files
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]