mushenglon-sudo commented on issue #12713:
URL: https://github.com/apache/apisix/issues/12713#issuecomment-3540354537

   This is the source code
   ==============================
   local core = require("apisix.core")
   local helper = require("apisix.plugins.ext-plugin.helper")
   local constants = require("apisix.constants")
   local http = require("resty.http")
   local cjson    = require("cjson.safe") 
   
   local ngx       = ngx
   local ngx_print = ngx.print
   local ngx_flush = ngx.flush
   local string    = string
   local str_sub   = string.sub
   
   
   local schema = {
       type = "object",
       properties = {
           xCeaiServiceId = {type = "string"},
           allow_degradation = {type = "boolean", default = false},
           status_on_error = {type = "integer", minimum = 200, maximum = 599, 
default = 403},
           ssl_verify = {
               type = "boolean",
               default = true,
           },
           request_method = {
               type = "string",
               default = "POST",
               enum = {"GET", "POST"},
               description = "the method for client to request the 
authorization service"
           },
           request_headers = {
               type = "array",
               default = {},
               items = {type = "string"},
               description = "client request header that will be sent to the 
authorization service"
           },
           extra_headers = {
               type = "object",
               minProperties = 1,
               patternProperties = {
                   ["^[^:]+$"] = {
                       type = "string",
                       description = "header value as a string; may contain 
variables"
                                     .. "like $remote_addr, $request_uri"
                   }
               },
               description = "extra headers sent to the authorization service; "
                           .. "values must be strings and can include variables"
                           .. "like $remote_addr, $request_uri."
           },
           upstream_headers = {
               type = "array",
               default = {},
               items = {type = "string"},
               description = "authorization response header that will be sent 
to the upstream"
           },
           client_headers = {
               type = "array",
               default = {},
               items = {type = "string"},
               description = "authorization response header that will be sent 
to"
                              .. "the client when authorizing failed"
           },
           timeout = {
               type = "integer",
               minimum = 1,
               maximum = 60000,
               default = 3000,
               description = "timeout in milliseconds",
           },
           keepalive = {type = "boolean", default = true},
           keepalive_timeout = {type = "integer", minimum = 1000, default = 
60000},
           keepalive_pool = {type = "integer", minimum = 1, default = 5},
       },
       required = {"xCeaiServiceId"}
   }
   local name = "custom-output-check"
   local _M = {
       version = 0.1,
       priority = 2006,
       name = name,
       schema = schema,
   }
   
   function _M.check_schema(conf)
       core.utils.check_tls_bool({"ssl_verify"}, conf, _M.name)
       return core.schema.check(_M.schema, conf)
   end
   
   
   local function include_req_headers(ctx)
       return core.request.headers(ctx)
   end
   
   
   local function get_response(ctx, conf)
       local http_obj = http.new()
       local ok, err = http_obj:connect({
           scheme = ctx.upstream_scheme,
           host = ctx.picked_server.host,
           port = ctx.picked_server.port,
       })
       if not ok then
           return nil, err
       end
       http_obj:set_timeout(conf.timeout)
   
       --  正确构建 URI:不加时间戳!
       local uri = ctx.var.upstream_uri and ctx.var.upstream_uri ~= "" and 
ctx.var.upstream_uri or ctx.var.uri
       local query = nil
       local qpos = core.string.find(uri, "?")
       if qpos then
           query = core.string.sub(uri, qpos + 1)
           uri = core.string.sub(uri, 1, qpos - 1)
       end
   
       local params = {
           path = uri,
           query = query or ctx.var.args,
           headers = include_req_headers(ctx),
           method = core.request.get_method(),
       }
   
       local body, err = core.request.get_body()
       if err then
           http_obj:close()
           return nil, err
       end
       if body then
           params.body = body
       end
   
       local res, err = http_obj:request(params)
       if not res then
           http_obj:close()
           return nil, err
       end
   
       -- 读取完整 body
       local body_content = ""
       if res.body_reader then
           local chunk
           repeat
               chunk, err = res.body_reader()
               if chunk then body_content = body_content .. chunk end
           until not chunk
       end
       core.log.error("=====================响应信息=====================")
       core.log.error("Status: ", res and res.status or "no response")
       core.log.error("认证服务响应体: ", body_content)
       -- 注意:不要手动 close,让 resty.http 自动管理(或只 close 一次)
       http_obj:close()  -- 可选,但不要重复 close
       return res, body_content, err
   end
   
   local function send_chunk(code,body_content)
       local ok, print_err = ngx_print(body_content)
       if not ok then
           return "output response failed: ".. (print_err or "")
       end
       local ok, flush_err = ngx_flush(true)
       if not ok then
           core.log.warn("flush response failed: ", flush_err)
       end
       ngx.exit(code)
       return nil
   end
   
   
   local function send_response(ctx, code, body_content)
       ngx.status = code or 500
       -- 发送响应体
       local err = send_chunk(code,body_content)
       if err then
           return err
       end
       return nil
   end
   
   function _M.before_proxy(conf, ctx)
       local res, body_content, err = get_response(ctx,conf)
       if not res or err then
           return 502
       end
       local code = (res and res.status) or 500
       if res and res.status == 200 then
           -- 业务逻辑
           local auth_headers = {
               ["X-Forwarded-Proto"] = core.request.get_scheme(ctx),
               ["X-Forwarded-Method"] = core.request.get_method(),
               ["X-Forwarded-Host"] = core.request.get_host(ctx),
               ["X-Ceai-Service-Id"] = conf.xCeaiServiceId,  -- 添加模型ID到请求头
               ["X-Forwarded-Uri"] = ctx.var.request_uri,
               ["X-Forwarded-For"] = core.request.get_remote_client_ip(ctx),
           }
           if conf.request_method == "POST" then
               auth_headers["Content-Length"] = core.request.header(ctx, 
"content-length")
               auth_headers["Expect"] = core.request.header(ctx, "expect")
               auth_headers["Transfer-Encoding"] = core.request.header(ctx, 
"transfer-encoding")
               auth_headers["Content-Encoding"] = core.request.header(ctx, 
"content-encoding")
           end
           if conf.extra_headers then
               for header, value in pairs(conf.extra_headers) do
                   if type(value) == "number" then
                       value = tostring(value)
                   end
                   local resolve_value, err = core.utils.resolve_var(value, 
ctx.var)
                   if not err then
                       auth_headers[header] = resolve_value
                   end
                   if err then
                       core.log.error("failed to resolve variable in extra 
header '",
                                       header, "': ",value,": ",err)
                   end
               end
           end
   
           -- append headers that need to be get from the client request header
           if #conf.request_headers > 0 then
               for _, header in ipairs(conf.request_headers) do
                   if not auth_headers[header] then
                       auth_headers[header] = core.request.header(ctx, header)
                   end
               end
           end
   
           local params = {
               headers = auth_headers,
               keepalive = conf.keepalive,
               ssl_verify = conf.ssl_verify,
               method = "POST",  -- 固定使用POST方法
               body = body_content,
           }
           if conf.keepalive then
               params.keepalive_timeout = conf.keepalive_timeout
               params.keepalive_pool = conf.keepalive_pool
           end
           local httpc = http.new()
           httpc:set_timeout(conf.timeout)
           local res, err = httpc:request_uri(conf.uri, params)
           httpc:close()
       end
       -- send origin response, status maybe changed.
       err = send_response(ctx, code, body_content)
       if err then
           core.log.error("send response error: ", tostring(err))
           return not ngx.headers_sent and 502 or nil
       end
       return 
   end
   return _M


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to