mushenglon-sudo commented on issue #12713:
URL: https://github.com/apache/apisix/issues/12713#issuecomment-3540354537
This is the source code
==============================
local core = require("apisix.core")
local helper = require("apisix.plugins.ext-plugin.helper")
local constants = require("apisix.constants")
local http = require("resty.http")
local cjson = require("cjson.safe")
local ngx = ngx
local ngx_print = ngx.print
local ngx_flush = ngx.flush
local string = string
local str_sub = string.sub
local schema = {
type = "object",
properties = {
xCeaiServiceId = {type = "string"},
allow_degradation = {type = "boolean", default = false},
status_on_error = {type = "integer", minimum = 200, maximum = 599,
default = 403},
ssl_verify = {
type = "boolean",
default = true,
},
request_method = {
type = "string",
default = "POST",
enum = {"GET", "POST"},
description = "the method for client to request the
authorization service"
},
request_headers = {
type = "array",
default = {},
items = {type = "string"},
description = "client request header that will be sent to the
authorization service"
},
extra_headers = {
type = "object",
minProperties = 1,
patternProperties = {
["^[^:]+$"] = {
type = "string",
description = "header value as a string; may contain
variables"
.. "like $remote_addr, $request_uri"
}
},
description = "extra headers sent to the authorization service; "
.. "values must be strings and can include variables"
.. "like $remote_addr, $request_uri."
},
upstream_headers = {
type = "array",
default = {},
items = {type = "string"},
description = "authorization response header that will be sent
to the upstream"
},
client_headers = {
type = "array",
default = {},
items = {type = "string"},
description = "authorization response header that will be sent
to"
.. "the client when authorizing failed"
},
timeout = {
type = "integer",
minimum = 1,
maximum = 60000,
default = 3000,
description = "timeout in milliseconds",
},
keepalive = {type = "boolean", default = true},
keepalive_timeout = {type = "integer", minimum = 1000, default =
60000},
keepalive_pool = {type = "integer", minimum = 1, default = 5},
},
required = {"xCeaiServiceId"}
}
local name = "custom-output-check"
local _M = {
version = 0.1,
priority = 2006,
name = name,
schema = schema,
}
function _M.check_schema(conf)
core.utils.check_tls_bool({"ssl_verify"}, conf, _M.name)
return core.schema.check(_M.schema, conf)
end
local function include_req_headers(ctx)
return core.request.headers(ctx)
end
local function get_response(ctx, conf)
local http_obj = http.new()
local ok, err = http_obj:connect({
scheme = ctx.upstream_scheme,
host = ctx.picked_server.host,
port = ctx.picked_server.port,
})
if not ok then
return nil, err
end
http_obj:set_timeout(conf.timeout)
-- 正确构建 URI:不加时间戳!
local uri = ctx.var.upstream_uri and ctx.var.upstream_uri ~= "" and
ctx.var.upstream_uri or ctx.var.uri
local query = nil
local qpos = core.string.find(uri, "?")
if qpos then
query = core.string.sub(uri, qpos + 1)
uri = core.string.sub(uri, 1, qpos - 1)
end
local params = {
path = uri,
query = query or ctx.var.args,
headers = include_req_headers(ctx),
method = core.request.get_method(),
}
local body, err = core.request.get_body()
if err then
http_obj:close()
return nil, err
end
if body then
params.body = body
end
local res, err = http_obj:request(params)
if not res then
http_obj:close()
return nil, err
end
-- 读取完整 body
local body_content = ""
if res.body_reader then
local chunk
repeat
chunk, err = res.body_reader()
if chunk then body_content = body_content .. chunk end
until not chunk
end
core.log.error("=====================响应信息=====================")
core.log.error("Status: ", res and res.status or "no response")
core.log.error("认证服务响应体: ", body_content)
-- 注意:不要手动 close,让 resty.http 自动管理(或只 close 一次)
http_obj:close() -- 可选,但不要重复 close
return res, body_content, err
end
local function send_chunk(code,body_content)
local ok, print_err = ngx_print(body_content)
if not ok then
return "output response failed: ".. (print_err or "")
end
local ok, flush_err = ngx_flush(true)
if not ok then
core.log.warn("flush response failed: ", flush_err)
end
ngx.exit(code)
return nil
end
local function send_response(ctx, code, body_content)
ngx.status = code or 500
-- 发送响应体
local err = send_chunk(code,body_content)
if err then
return err
end
return nil
end
function _M.before_proxy(conf, ctx)
local res, body_content, err = get_response(ctx,conf)
if not res or err then
return 502
end
local code = (res and res.status) or 500
if res and res.status == 200 then
-- 业务逻辑
local auth_headers = {
["X-Forwarded-Proto"] = core.request.get_scheme(ctx),
["X-Forwarded-Method"] = core.request.get_method(),
["X-Forwarded-Host"] = core.request.get_host(ctx),
["X-Ceai-Service-Id"] = conf.xCeaiServiceId, -- 添加模型ID到请求头
["X-Forwarded-Uri"] = ctx.var.request_uri,
["X-Forwarded-For"] = core.request.get_remote_client_ip(ctx),
}
if conf.request_method == "POST" then
auth_headers["Content-Length"] = core.request.header(ctx,
"content-length")
auth_headers["Expect"] = core.request.header(ctx, "expect")
auth_headers["Transfer-Encoding"] = core.request.header(ctx,
"transfer-encoding")
auth_headers["Content-Encoding"] = core.request.header(ctx,
"content-encoding")
end
if conf.extra_headers then
for header, value in pairs(conf.extra_headers) do
if type(value) == "number" then
value = tostring(value)
end
local resolve_value, err = core.utils.resolve_var(value,
ctx.var)
if not err then
auth_headers[header] = resolve_value
end
if err then
core.log.error("failed to resolve variable in extra
header '",
header, "': ",value,": ",err)
end
end
end
-- append headers that need to be get from the client request header
if #conf.request_headers > 0 then
for _, header in ipairs(conf.request_headers) do
if not auth_headers[header] then
auth_headers[header] = core.request.header(ctx, header)
end
end
end
local params = {
headers = auth_headers,
keepalive = conf.keepalive,
ssl_verify = conf.ssl_verify,
method = "POST", -- 固定使用POST方法
body = body_content,
}
if conf.keepalive then
params.keepalive_timeout = conf.keepalive_timeout
params.keepalive_pool = conf.keepalive_pool
end
local httpc = http.new()
httpc:set_timeout(conf.timeout)
local res, err = httpc:request_uri(conf.uri, params)
httpc:close()
end
-- send origin response, status maybe changed.
err = send_response(ctx, code, body_content)
if err then
core.log.error("send response error: ", tostring(err))
return not ngx.headers_sent and 502 or nil
end
return
end
return _M
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]