This is an automated email from the ASF dual-hosted git repository.
JNSimba pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 62dc78fbb00 [fix](logstash) Replace HttpClient5 async with HttpClient4
sync to fix CircularRedirectException (1.2.0 -> 1.2.1) (#63181)
62dc78fbb00 is described below
commit 62dc78fbb00b1e8617f8a379ca9dd5dc250a32c1
Author: bingquanzhao <[email protected]>
AuthorDate: Mon Jun 8 12:07:25 2026 +0800
[fix](logstash) Replace HttpClient5 async with HttpClient4 sync to fix
CircularRedirectException (1.2.0 -> 1.2.1) (#63181)
Replace HttpClient5 async with HttpClient4 sync to fix
CircularRedirectException (1.2.0 -> 1.2.1)
The logstash-output-doris plugin uses Apache HttpClient5 async client to
PUT stream load requests. Against SelectDB Cloud / BYOC FE — which
returns '307 + Connection: close' on stream load — the async client
fails with CircularRedirectException under any meaningful concurrency /
body size.
Root cause:
1. HC5 async does not strictly block body transmission while waiting for
'100 Continue'. When FE returns 307 before issuing 100, the entity
producer has already started writing; FE closing the connection then
yields an IOException mid-transfer.
2. HC5 default exec chain wraps RedirectExec around
AsyncHttpRequestRetryExec. The recoverable IOException triggers an
internal retry that re-enters the same FE -> 307 path, but
RedirectLocations from the first attempt is still populated, so the same
BE URL is detected as 'already visited' and reported as a circular
redirect.
This is a real HC5-vs-HC4 implementation difference, not a configuration
issue. The Doris Flink connector also follows FE 307 to BE in its
default path (autoRedirect=true) and works correctly precisely because
it uses HC4 sync: HC4 honors 'Expect: 100-continue' strictly, so when FE
307s without sending 100, the entity is left unconsumed and HC4's
RedirectExec follows the redirect normally.
This patch aligns the plugin with the Flink connector's HTTP layer:
- bump gem version 1.2.0 -> 1.2.1
- httpclient5 5.4.2 (async) -> httpclient 4.5.13 (sync)
- SimpleRequestBuilder -> HttpPut + ByteArrayEntity (repeatable)
- HttpAsyncClients defaults -> HttpClients with:
* setRequestExecutor(HttpRequestExecutor(60s))
* setRedirectStrategy(DorisRedirectStrategy) (isRedirectable=true,
strip userinfo, normalize empty query)
* setRetryHandler(DefaultHttpRequestRetryHandler(0, false))
* setConnectionReuseStrategy(NoConnectionReuseStrategy.INSTANCE)
* RequestConfig.setExpectContinueEnabled(true)
- Async future plumbing in TableEvents replaced with sync
response_code / response_body / response_error fields.
- Stringify both key and value at request.addHeader call site: HC4's
addHeader(String, String) is strict on types whereas HC5 had a
permissive (String, Object) overload; user configs commonly carry
Float / Integer values like 'max_filter_ratio => 1.0'.
- Drop 's.requirements << jar ...' from gemspec: with JARs vendored
under
lib/, the maven lookup at install time is unnecessary and forced users
to set JARS_SKIP=true for offline installs.
Pipeline configuration, retry queue, save_on_failure, group_commit,
label generation, header handling - all unchanged.
Verified on a SelectDB BYOC cluster mirroring the reported production
shape (16 workers x 10000 batch x 200,000 events):
- Before: 100% requests fail with CircularRedirectException
- After: 20/20 stream loads Status=Success, 200,000/200,000 rows
ingested, 0 HTTP-layer errors.
---
extension/logstash/README.md | 50 +++++++++++--
extension/logstash/lib/logstash/outputs/doris.rb | 89 ++++++++++++++++++------
extension/logstash/logstash-output-doris.gemspec | 4 +-
3 files changed, 116 insertions(+), 27 deletions(-)
diff --git a/extension/logstash/README.md b/extension/logstash/README.md
index 34feb45db26..c29d4c1ca40 100644
--- a/extension/logstash/README.md
+++ b/extension/logstash/README.md
@@ -17,12 +17,52 @@ specific language governing permissions and limitations
under the License.
-->
-1. How to build
+# logstash-output-doris
- `gem build logstash-output-doris.gemspec`
+A Logstash output plugin that ships events to Doris via stream load.
-2. How to use
+Docs: <https://doris.apache.org/docs/dev/ecosystem/logstash>
- `https://doris.apache.org/zh-CN/docs/dev/ecosystem/logstash`
- `https://doris.apache.org/docs/dev/ecosystem/logstash`
+## Build
+Prerequisites: JRuby (>= 9.4 with Java 21, or 9.2 with Java 8/11) and the
+`jar-dependencies` gem (`jruby -S gem install jar-dependencies`).
+
+```bash
+# 1. Vendor HttpClient4 + transitive jars into lib/ and generate the loader.
+# Reads the 'jar' entry from logstash-output-doris.gemspec.
+jruby -e "require 'jars/installer'; Jars::Installer.new.vendor_jars"
+
+# 2. Build the gem.
+jruby -S gem build logstash-output-doris.gemspec
+```
+
+Produces `logstash-output-doris-<version>-java.gem`.
+
+## Install
+
+The jars are already vendored inside the gem, so the install hook does not
+need to talk to Maven — pass `JARS_SKIP=true` to skip the lookup:
+
+```bash
+JARS_SKIP=true $LS_HOME/bin/logstash-plugin install \
+ logstash-output-doris-<version>-java.gem
+```
+
+### Air-gapped install (offline pack)
+
+For a Logstash host without internet access, build an offline pack on a
+connected host first:
+
+```bash
+# On a connected host (same Logstash major version as the target):
+JARS_SKIP=true $LS_HOME/bin/logstash-plugin install \
+ logstash-output-doris-<version>-java.gem
+$LS_HOME/bin/logstash-plugin prepare-offline-pack \
+ --output logstash-output-doris-<version>-offline-pack.zip \
+ logstash-output-doris
+
+# Then on the air-gapped target:
+$LS_HOME/bin/logstash-plugin install \
+ file:///path/to/logstash-output-doris-<version>-offline-pack.zip
+```
diff --git a/extension/logstash/lib/logstash/outputs/doris.rb
b/extension/logstash/lib/logstash/outputs/doris.rb
index 86922036c58..b1e28eb4ae2 100644
--- a/extension/logstash/lib/logstash/outputs/doris.rb
+++ b/extension/logstash/lib/logstash/outputs/doris.rb
@@ -33,9 +33,14 @@ require 'java'
require "#{File.dirname(__FILE__)}/../../logstash-output-doris_jars.rb"
class LogStash::Outputs::Doris < LogStash::Outputs::Base
- include_package 'org.apache.hc.client5.http.impl.async'
- include_package 'org.apache.hc.client5.http.async.methods'
- include_package 'org.apache.hc.core5.http'
+ java_import 'org.apache.http.client.methods.HttpPut'
+ java_import 'org.apache.http.entity.ByteArrayEntity'
+ java_import 'org.apache.http.util.EntityUtils'
+ java_import 'org.apache.http.impl.client.HttpClients'
+ java_import 'org.apache.http.impl.client.DefaultHttpRequestRetryHandler'
+ java_import 'org.apache.http.impl.NoConnectionReuseStrategy'
+ java_import 'org.apache.http.protocol.HttpRequestExecutor'
+ java_import 'org.apache.http.client.config.RequestConfig'
# support multi thread concurrency for performance
# so multi_receive() and function it calls are all stateless and thread safe
@@ -96,11 +101,19 @@ class LogStash::Outputs::Doris < LogStash::Outputs::Base
:http_hosts => @http_hosts)
end
- class DorisRedirectStrategy <
Java::org.apache.hc.client5.http.impl.DefaultRedirectStrategy
+ class DorisRedirectStrategy <
Java::org.apache.http.impl.client.DefaultRedirectStrategy
+ # allow redirect for all methods (PUT/POST) - HC4 only allows GET/HEAD
by default
+ def isRedirectable(method)
+ true
+ end
+
def getLocationURI(request, response, context)
uri = super(request, response, context)
# remove user info in redirect uri
- java.net.URI.new(uri.getScheme, nil, uri.getHost, uri.getPort,
uri.getPath, uri.getQuery, uri.getFragment)
+ # normalize empty query string ("?" with nothing after) to no query
at all
+ query = uri.getQuery
+ query = nil if query == ""
+ java.net.URI.new(uri.getScheme, nil, uri.getHost, uri.getPort,
uri.getPath, query, uri.getFragment)
end
end
@@ -109,8 +122,21 @@ class LogStash::Outputs::Doris < LogStash::Outputs::Base
end
def register
- @client =
HttpAsyncClients.custom.setRedirectStrategy(DorisRedirectStrategy.new).build
- @client.start
+ # HttpClient 4.5.13 sync — same setup as Doris Flink connector
(HttpUtil.java)
+ # Key points:
+ # - setRequestExecutor(60s) : long wait for 100-continue, FE
may delay 307 under load
+ # - setRedirectStrategy : follow 307 on PUT (default
DefaultRedirectStrategy refuses)
+ # - DefaultHttpRequestRetryHandler(0, false) : NO retries -> avoid
spurious CircularRedirect
+ # - NoConnectionReuseStrategy : one connection per request, dodge
keep-alive half-close
+ # - setExpectContinueEnabled(true) : critical -> HC4 waits for 100, FE
307s before body is sent,
+ # entity stays unconsumed,
RedirectExec follows successfully
+ @client = HttpClients.custom
+ .setRequestExecutor(HttpRequestExecutor.new(60_000))
+ .setRedirectStrategy(DorisRedirectStrategy.new)
+ .setRetryHandler(DefaultHttpRequestRetryHandler.new(0, false))
+ .setConnectionReuseStrategy(NoConnectionReuseStrategy::INSTANCE)
+
.setDefaultRequestConfig(RequestConfig.custom.setExpectContinueEnabled(true).build)
+ .build
@request_headers = make_request_headers
@logger.info("request headers: ", @request_headers)
@@ -262,11 +288,11 @@ class LogStash::Outputs::Doris < LogStash::Outputs::Base
response = ""
if stat == STAT_SUCCESS
- begin
- response = table_events.response_future.get.getBodyText
- rescue => e
- log_failure("doris stream load request error: #{e}")
+ if table_events.response_error
+ log_failure("doris stream load request error:
#{table_events.response_error}")
stat = STAT_RETRY
+ else
+ response = table_events.response_body
end
end
@@ -339,15 +365,33 @@ class LogStash::Outputs::Doris < LogStash::Outputs::Base
end
@logger.debug("doris stream load request body:
#{table_events.documents}")
- request = SimpleRequestBuilder.
- put(url).
- setBody(table_events.documents, ContentType::TEXT_PLAIN).
- build
+ request = HttpPut.new(url)
+ # ByteArrayEntity: known content-length, repeatable -> safe across
307 retries.
+ # Combined with Expect:100-continue at the client config level, body
is held back
+ # until FE decides; FE 307 short-circuits without consuming the
entity.
+
request.setEntity(ByteArrayEntity.new(table_events.documents.to_java_bytes))
+ # HC4's addHeader(String, String) is strict on types — users commonly
write
+ # non-string values like `max_filter_ratio => 1.0` (Float) or
`timeout => 1200`
+ # (Integer) in their pipeline config. HC5 tolerated this via an
(String, Object)
+ # overload; HC4 does not. Stringify both sides to keep that
ergonomics.
table_events.http_headers.each do |k, v|
- request.addHeader(k, v)
+ request.addHeader(k.to_s, v.to_s)
end
- table_events.response_future = @client.execute(request, nil)
+ # Sync execute. Capture body + status into table_events so
handle_request stays
+ # mostly unchanged. Exceptions are stashed for the same flow to
interpret as RETRY.
+ begin
+ response = @client.execute(request)
+ begin
+ table_events.response_code =
response.getStatusLine.getStatusCode
+ entity = response.getEntity
+ table_events.response_body = entity.nil? ? "" :
EntityUtils.toString(entity)
+ ensure
+ response.close
+ end
+ rescue => e
+ table_events.response_error = e
+ end
end
end # def make_request
@@ -423,7 +467,8 @@ class LogStash::Outputs::Doris < LogStash::Outputs::Base
end
class TableEvents
- attr_accessor :table, :http_headers, :events, :events_count, :documents,
:req_count, :response_future
+ attr_accessor :table, :http_headers, :events, :events_count, :documents,
:req_count,
+ :response_code, :response_body, :response_error
def initialize(table, http_headers)
@table = table
@@ -434,12 +479,16 @@ class LogStash::Outputs::Doris < LogStash::Outputs::Base
@documents = ""
@req_count = 1
- @response_future = nil
+ @response_code = nil
+ @response_body = nil
+ @response_error = nil
end
def prepare_retry
@req_count += 1
- @response_future = nil
+ @response_code = nil
+ @response_body = nil
+ @response_error = nil
end
end
diff --git a/extension/logstash/logstash-output-doris.gemspec
b/extension/logstash/logstash-output-doris.gemspec
index ba94518d279..daa23cdb39f 100644
--- a/extension/logstash/logstash-output-doris.gemspec
+++ b/extension/logstash/logstash-output-doris.gemspec
@@ -18,7 +18,7 @@ under the License.
=end
Gem::Specification.new do |s|
s.name = 'logstash-output-doris'
- s.version = '1.2.0'
+ s.version = '1.2.1'
s.author = 'Apache Doris'
s.email = '[email protected]'
s.homepage = 'http://doris.apache.org'
@@ -46,5 +46,5 @@ Gem::Specification.new do |s|
s.add_development_dependency 'sinatra', '~> 1.4'
s.add_development_dependency 'webrick', '~> 1.9'
- s.requirements << 'jar org.apache.httpcomponents.client5, httpclient5, 5.4.2'
+ s.requirements << 'jar org.apache.httpcomponents, httpclient, 4.5.13'
end
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]