This is an automated email from the ASF dual-hosted git repository.
jianliangqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 32804430d70 improve logstash doris output plugin (#33135)
32804430d70 is described below
commit 32804430d70d07c5c12755646a014e63cac93be8
Author: Kang <[email protected]>
AuthorDate: Mon Apr 22 15:00:40 2024 +0800
improve logstash doris output plugin (#33135)
1. support multi thread concurrency for performance
2. support retry count and infinite retry
3. add a config to log doris stream load request header and response
4. add a config to log speed for better observability
---
extension/logstash/lib/logstash/outputs/doris.rb | 316 ++++++++++++-----------
extension/logstash/logstash-output-doris.gemspec | 4 +-
2 files changed, 167 insertions(+), 153 deletions(-)
diff --git a/extension/logstash/lib/logstash/outputs/doris.rb
b/extension/logstash/lib/logstash/outputs/doris.rb
index b7334aefb5f..34124f446bb 100644
--- a/extension/logstash/lib/logstash/outputs/doris.rb
+++ b/extension/logstash/lib/logstash/outputs/doris.rb
@@ -23,21 +23,21 @@ require "logstash/namespace"
require "logstash/json"
require "logstash/util/shortname_resolver"
require "uri"
-require "stud/buffer"
require "logstash/plugin_mixins/http_client"
require "securerandom"
require "json"
require "base64"
require "restclient"
+require 'thread'
class LogStash::Outputs::Doris < LogStash::Outputs::Base
- include LogStash::PluginMixins::HttpClient
- include Stud::Buffer
-
- concurrency :single
+ # support multi thread concurrency for performance
+ # so multi_receive() and function it calls are all stateless and thread safe
+ concurrency :shared
config_name "doris"
+
# hosts array of Doris Frontends. eg ["http://fe1:8030", "http://fe2:8030"]
config :http_hosts, :validate => :array, :required => true
# the database which data is loaded to
@@ -45,61 +45,35 @@ class LogStash::Outputs::Doris < LogStash::Outputs::Base
# the table which data is loaded to
config :table, :validate => :string, :required => true
# label prefix of a stream load requst.
- config :label_prefix, :validate => :string, :required => true
- # user
+ config :label_prefix, :validate => :string, :default => "logstash"
+ # user name
config :user, :validate => :string, :required => true
# password
config :password, :validate => :password, :required => true
- # column separator
- config :column_separator, :validate => :string, :default => ""
- # column mappings. eg: "k1, k2, tmpk3, k3 = tmpk3 + 1"
- config :columns, :validate => :string, :default => ""
- # where predicate to filter data. eg: "k1 > 1 and k3 < 100"
- config :where, :validate => :string, :default => ""
- # max filter ratio
- config :max_filter_ratio, :validate => :number, :default => -1
- # partition which data is loaded to. eg: "p1, p2"
- config :partition, :validate => :array, :default => {}
- # timeout of a stream load, in second
- config :timeout, :validate => :number, :default => -1
- # switch off or on of strict mode
- config :strict_mode, :validate => :string, :default => "false"
- # timezone
- config :timezone, :validate => :string, :default => ""
- # memory limit of a stream load
- config :exec_mem_limit, :validate => :number, :default => -1
- # Specify the format of imported data, csv and json are supported.
- config :format, :validate => ['csv', 'json', 'csv_with_names',
'csv_with_names_and_types', 'parquet', 'orc'], :default => "csv"
- # jsonpaths example: jsonpaths => ["$.id", "$.type", "$.actor.id",
"$.actor.login"]
- config :jsonpaths, :validate => :array, :default => []
- # Specify the root node of the json document
- config :json_root, :validate => :string, :default => ""
- # Boolean, true means the json will be parsed in the first row of the
schema, turn on this option to improve the efficiency of json importing.
- config :fuzzy_parse, :validate => :boolean, :default => false
- # Parse json data converts numeric types to strings.
- config :num_as_string, :validate => :boolean, :default => false
- # true means support for reading one json object per line
- config :read_json_by_line, :validate => :boolean, :default => false
- #
+
+ # use message field only
+ config :message_only, :validate => :boolean, :default => false
+ # field mapping
+ config :mapping, :validate => :hash
# Custom headers to use
# format is `headers => ["X-My-Header", "%{host}"]`
config :headers, :validate => :hash
- config :batch_size, :validate => :number, :default => 100000
-
- config :idle_flush_time, :validate => :number, :default => 20
-
- config :save_on_failure, :validate => :boolean, :default => true
+ config :save_on_failure, :validate => :boolean, :default => false
- config :save_dir, :validate => :string, :default => "/tmp"
+ config :save_dir, :validate => :string, :default => "./"
config :save_file, :validate => :string, :default => "failed.data"
config :host_resolve_ttl_sec, :validate => :number, :default => 120
- config :automatic_retries, :validate => :number, :default => 3
+ config :retry_count, :validate => :number, :default => -1
+
+ config :log_request, :validate => :boolean, :default => false
+
+ config :log_speed_interval, :validate => :number, :default => 10
def print_plugin_info()
@@ -112,23 +86,10 @@ class LogStash::Outputs::Doris < LogStash::Outputs::Base
:db => @db,
:table => @table,
:label_prefix => @label_prefix,
- :batch_size => @batch_size,
- :idle_flush_time => @idle_flush_time,
:http_hosts => @http_hosts)
end
def register
- # Handle this deprecated option. TODO: remove the option
- #@ssl_certificate_validation = @verify_ssl if @verify_ssl
-
- # We count outstanding requests with this queue
- # This queue tracks the requests to create backpressure
- # When this queue is empty no new requests may be sent,
- # tokens must be added back by the client on success
- #@request_tokens = SizedQueue.new(@pool_max)
- #@pool_max.times {|t| @request_tokens << true }
- #@requests = Array.new
-
@http_query = "/api/#{db}/#{table}/_stream_load"
@hostnames_pool =
@@ -138,11 +99,38 @@ class LogStash::Outputs::Doris < LogStash::Outputs::Base
@request_headers = make_request_headers
@logger.info("request headers: ", @request_headers)
- buffer_initialize(
- :max_items => @batch_size,
- :max_interval => @idle_flush_time,
- :logger => @logger
- )
+ @init_time = Time.now.to_i # seconds
+ @total_bytes = java.util.concurrent.atomic.AtomicLong.new(0)
+ @total_rows = java.util.concurrent.atomic.AtomicLong.new(0)
+
+ report_thread = Thread.new do
+ last_time = @init_time
+ last_bytes = @total_bytes.get
+ last_rows = @total_rows.get
+ @logger.info("will report speed every #{@log_speed_interval} seconds")
+ while @log_speed_interval > 0
+ sleep(@log_speed_interval)
+
+ cur_time = Time.now.to_i # seconds
+ cur_bytes = @total_bytes.get
+ cur_rows = @total_rows.get
+ total_time = cur_time - @init_time
+ total_speed_mbps = cur_bytes / 1024 / 1024 / total_time
+ total_speed_rps = cur_rows / total_time
+
+ inc_bytes = cur_bytes - last_bytes
+ inc_rows = cur_rows - last_rows
+ inc_time = cur_time - last_time
+ inc_speed_mbps = inc_bytes / 1024 / 1024 / inc_time
+ inc_speed_rps = inc_rows / inc_time
+
+ @logger.info("total #{cur_bytes/1024/1024} MB #{cur_rows} ROWS,
total speed #{total_speed_mbps} MB/s #{total_speed_rps} R/s, last #{inc_time}
seconds speed #{inc_speed_mbps} MB/s #{inc_speed_rps} R/s")
+
+ last_time = cur_time
+ last_bytes = cur_bytes
+ last_rows = cur_rows
+ end
+ end
print_plugin_info()
end # def register
@@ -180,58 +168,83 @@ class LogStash::Outputs::Doris < LogStash::Outputs::Base
end
end
- # This module currently does not support parallel requests as that would
circumvent the batching
- def receive(event)
- buffer_receive(event)
+ def multi_receive(events)
+ return if events.empty?
+ send_events(events)
end
- public
- def flush(events, close=false)
+ private
+ def send_events(events)
documents = ""
event_num = 0
events.each do |event|
- documents << event.get("[message]") << "\n"
+ documents << event_body(event) << "\n"
event_num += 1
end
- @logger.info("get event num: #{event_num}")
+ # @logger.info("get event num: #{event_num}")
@logger.debug("get documents: #{documents}")
hosts = get_host_addresses()
- @request_headers["label"] = label_prefix + "_" + @db + "_" + @table +
"_" + Time.now.strftime('%Y%m%d%H%M%S_%L')
- make_request(documents, hosts, @http_query, 1, hosts.sample)
- end
-
- private
+ http_headers = @request_headers.dup
+ http_headers["label"] = label_prefix + "_" + @db + "_" + @table + "_" +
Time.now.strftime('%Y%m%d_%H%M%S_%L_' + SecureRandom.uuid)
+
+ # @request_headers["label"] = label_prefix + "_" + @db + "_" + @table +
"_" + Time.now.strftime('%Y%m%d%H%M%S_%L')
+ req_count = 0
+ sleep_for = 1
+ while true
+ response = make_request(documents, http_headers, hosts, @http_query,
hosts.sample)
+
+ req_count += 1
+ response_json = {}
+ begin
+ response_json = JSON.parse(response.body)
+ rescue => e
+ @logger.warn("doris stream load response: #{response} is not a
valid JSON")
+ end
+ if response_json["Status"] == "Success"
+ @total_bytes.addAndGet(documents.size)
+ @total_rows.addAndGet(event_num)
+ break
+ else
+ @logger.warn("FAILED doris stream load response:\n#{response}")
+
+ if @retry_count >= 0 && req_count > @retry_count
+ @logger.warn("DROP this batch after failed #{req_count} times.")
+ if @save_on_failure
+ @logger.warn("Try save to disk.Disk file path :
#{save_dir}/#{table}_#{save_file}")
+ save_to_disk(documents)
+ end
+ break
+ end
- def save_to_disk(documents)
- begin
- file = File.open("#{save_dir}/#{db}_#{table}_#{save_file}", "a")
- file.write(documents)
- rescue IOError => e
- log_failure("An error occurred while saving file to disk: #{e}",
- :file_name => file_name)
- ensure
- file.close unless file.nil?
+ # sleep and then retry
+ sleep_for = sleep_for * 2
+ sleep_for = sleep_for <= 60 ? sleep_for : 60
+ sleep_rand = (sleep_for / 2) + (rand(0..sleep_for) / 2)
+ @logger.warn("Will do retry #{req_count} after sleep #{sleep_rand}
secs.")
+ sleep(sleep_rand)
+ end
end
end
-
private
-
- def make_request(documents, hosts, query, req_count = 1,host = "", uuid =
SecureRandom.hex)
-
+ def make_request(documents, http_headers, hosts, query, host = "")
if host == ""
host = hosts.pop
end
- url = host+query
- @logger.debug("req count: #{req_count}. get url: #{url}")
- @logger.debug("request headers: ", @request_headers)
-
+ url = host + query
- result = RestClient.put(url, documents,@request_headers) { |response,
request, result|
+ if @log_request or @logger.debug?
+ @logger.info("doris stream load request url: #{url} headers:
#{http_headers} body size: #{documents.size}")
+ end
+ @logger.debug("doris stream load request body: #{documents}")
+
+ response = ""
+ begin
+ response = RestClient.put(url, documents, http_headers) { |response,
request, result|
case response.code
when 301, 302, 307
@logger.debug("redirect to:
#{response.headers[:location]}")
@@ -239,77 +252,78 @@ class LogStash::Outputs::Doris < LogStash::Outputs::Base
else
response.return!
end
- }
-
- @logger.info("response : \n #{result}" )
- result_body = JSON.parse(result.body)
- if result_body['Status'] != "Success"
- if req_count < @automatic_retries
- @logger.warn("Response Status : #{result_body['Status']} .
Retrying...... #{req_count}")
- make_request(documents,hosts,query,req_count + 1,host,uuid)
- return
- end
- @logger.warn("Load failed ! Try #{req_count} times.")
- if @save_on_failure
- @logger.warn("Retry times over #{req_count} times.Try save to
disk.Disk file path : #{save_dir}/#{table}_#{save_file}")
- save_to_disk(documents)
- end
+ }
+ rescue => e
+ log_failure("doris stream load request error: #{e}")
end
+ if @log_request or @logger.debug?
+ @logger.info("doris stream load response:\n#{response}")
+ end
+
+ return response
end # def make_request
+ # Format the HTTP body
+ private
+ def event_body(event)
+ if @message_only
+ event.get("[message]")
+ else
+ LogStash::Json.dump(map_event(event))
+ end
+ end
+
+ private
+ def map_event(event)
+ if @mapping
+ # only get fields in mapping
+ convert_mapping(@mapping, event)
+ else
+ # get all fields
+ event.to_hash
+ end
+ end
+
+ private
+ def convert_mapping(mapping, event)
+ if mapping.is_a?(Hash)
+ mapping.reduce({}) do |acc, kv|
+ k, v = kv
+ acc[k] = convert_mapping(v, event)
+ acc
+ end
+ elsif mapping.is_a?(Array)
+ mapping.map { |elem| convert_mapping(elem, event) }
+ else
+ event.sprintf(mapping)
+ end
+ end
+
+ private
+ def save_to_disk(documents)
+ begin
+ file = File.open("#{save_dir}/#{db}_#{table}_#{save_file}", "a")
+ file.write(documents)
+ rescue IOError => e
+ log_failure("An error occurred while saving file to disk: #{e}",
+ :file_name => file_name)
+ ensure
+ file.close unless file.nil?
+ end
+ end
+
# This is split into a separate method mostly to help testing
- def log_failure(message, opts)
- @logger.warn("[HTTP Output Failure] #{message}", opts)
+ def log_failure(message)
+ @logger.warn("[Doris Output Failure] #{message}")
end
def make_request_headers()
headers = @headers || {}
headers["Expect"] ||= "100-continue"
headers["Content-Type"] ||= "text/plain;charset=utf-8"
- headers["strict_mode"] ||= @strict_mode
headers["Authorization"] = "Basic " +
Base64.strict_encode64("#{user}:#{password.value}")
- # column_separator
- if @column_separator != ""
- headers["column_separator"] = @column_separator
- end
- # timezone
- if @timezone != ""
- headers["timezone"] = @timezone
- end
- # partition
- if @partition.size > 0
- headers["partition"] ||= @partition
- end
- # where
- if @where != ""
- headers["where"] ||= @where
- end
- # timeout
- if @timeout != -1
- headers["timeout"] ||= @timeout
- end
- # max_filter_ratio
- if @max_filter_ratio != -1
- headers["max_filter_ratio"] ||= @max_filter_ratio
- end
- # exec_mem_limit
- if @exec_mem_limit != -1
- headers["exec_mem_limit"] ||= @exec_mem_limit
- end
- # columns
- if @columns != ""
- headers["columns"] ||= @columns
- end
- headers["format"] = @format if @format != ""
- headers["jsonpaths"] = @jsonpaths if @jsonpaths != []
- headers["json_root"] = @json_root if @json_root != ""
- headers["fuzzy_parse"] = @fuzzy_parse if @fuzzy_parse != ""
- headers["num_as_string"] = @num_as_string if @num_as_string != ""
- headers["read_json_by_line"] = @read_json_by_line if @read_json_by_line
!= ""
headers
end
end # end of class LogStash::Outputs::Doris
-
-
diff --git a/extension/logstash/logstash-output-doris.gemspec
b/extension/logstash/logstash-output-doris.gemspec
index ee456a0e42d..163ba260f07 100644
--- a/extension/logstash/logstash-output-doris.gemspec
+++ b/extension/logstash/logstash-output-doris.gemspec
@@ -18,8 +18,8 @@ under the License.
=end
Gem::Specification.new do |s|
s.name = 'logstash-output-doris'
- s.version = '0.2.0'
- s.author = 'wfjcmcb'
+ s.version = '1.0.0'
+ s.author = 'Apache Doris'
s.email = '[email protected]'
s.homepage = 'http://doris.apache.org'
s.licenses = ['Apache-2.0']
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]