spacewander commented on code in PR #6968: URL: https://github.com/apache/apisix/pull/6968#discussion_r867303030
########## apisix/plugins/ext-plugin-post-resp.lua: ########## @@ -0,0 +1,168 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- +local core = require("apisix.core") +local ext = require("apisix.plugins.ext-plugin.init") +local constants = require("apisix.constants") +local http = require("resty.http") + +local ngx = ngx +local ngx_print = ngx.print +local ngx_flush = ngx.flush +local string = string +local str_sub = string.sub + + +local name = "ext-plugin-post-resp" +local _M = { + version = 0.1, + priority = -4000, + name = name, + schema = ext.schema, +} + + +local function include_req_headers(ctx) + -- TODO: handle proxy_set_header + return core.request.headers(ctx) +end + + +local function get_http_obj(ctx) + return http.new() +end + + +local function close(ctx, http_obj) Review Comment: Ditto ########## apisix/plugins/ext-plugin-post-resp.lua: ########## @@ -0,0 +1,168 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- +local core = require("apisix.core") +local ext = require("apisix.plugins.ext-plugin.init") +local constants = require("apisix.constants") +local http = require("resty.http") + +local ngx = ngx +local ngx_print = ngx.print +local ngx_flush = ngx.flush +local string = string +local str_sub = string.sub + + +local name = "ext-plugin-post-resp" +local _M = { + version = 0.1, + priority = -4000, + name = name, + schema = ext.schema, +} + + +local function include_req_headers(ctx) + -- TODO: handle proxy_set_header + return core.request.headers(ctx) +end + + +local function get_http_obj(ctx) Review Comment: We don't need the ctx? ########## t/plugin/ext-plugin/response.t: ########## @@ -0,0 +1,420 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +use t::APISIX 'no_plan'; + +repeat_each(1); +no_long_string(); +no_root_location(); +no_shuffle(); + +add_block_preprocessor(sub { + my ($block) = @_; + + $block->set_value("stream_conf_enable", 1); + + if (!defined $block->extra_stream_config) { + my $stream_config = <<_EOC_; + server { + listen unix:\$TEST_NGINX_HTML_DIR/nginx.sock; + + content_by_lua_block { + local ext = require("lib.ext-plugin") + ext.go({}) + } + } + +_EOC_ + $block->set_value("extra_stream_config", $stream_config); + } + + my $unix_socket_path = $ENV{"TEST_NGINX_HTML_DIR"} . "/nginx.sock"; + my $cmd = $block->ext_plugin_cmd // "['sleep', '5s']"; + my $extra_yaml_config = <<_EOC_; +ext-plugin: + path_for_test: $unix_socket_path + cmd: $cmd +_EOC_ + + $block->set_value("extra_yaml_config", $extra_yaml_config); + + if (!$block->request) { + $block->set_value("request", "GET /t"); + } + + if (!$block->error_log) { + $block->set_value("no_error_log", "[error]\n[alert]"); + } +}); + +run_tests; + +__DATA__ + +=== TEST 1: add route +--- config + location /t { + content_by_lua_block { + local json = require("toolkit.json") + local t = require("lib.test_admin") + + local code, message, res = t.test('/apisix/admin/routes/1', + ngx.HTTP_PUT, + [[{ + "uri": "/*", + "plugins": { + "ext-plugin-post-resp": { + } + }, + "upstream": { + "nodes": { + "127.0.0.1:1980": 1 + }, + "type": "roundrobin" + } + }]] + ) + + if code >= 300 then + ngx.status = code + ngx.say(message) + return + end + + ngx.say(message) + } + } +--- response_body +passed + + + +=== TEST 2: check input +--- request +GET /hello Review Comment: Is it possible to check if the args in the uri are sent to the upstream correctly? ########## apisix/plugins/ext-plugin/init.lua: ########## @@ -682,6 +682,107 @@ local rpc_handlers = { return true end, + nil, -- ignore RPC_EXTRA_INFO, already processed during RPC_HTTP_REQ_CALL interaction + function (conf, ctx, sock, entry) + local lrucache_id = core.lrucache.plugin_ctx_id(ctx, entry) + local token, err = core.lrucache.plugin_ctx(lrucache, ctx, entry, rpc_call, + constants.RPC_PREPARE_CONF, conf, ctx, + lrucache_id) + if not token then + return nil, err + end + + builder:Clear() + local var = ctx.var + + local res = ctx.runner_ext_response + local textEntries = {} + local hdrs = res.headers + for key, val in pairs(hdrs) do + local ty = type(val) + if ty == "table" then + for _, v in ipairs(val) do + core.table.insert(textEntries, build_headers(var, builder, key, v)) + end + else + core.table.insert(textEntries, build_headers(var, builder, key, val)) + end + end + local len = #textEntries + http_resp_call_req.StartHeadersVector(builder, len) + for i = len, 1, -1 do + builder:PrependUOffsetTRelative(textEntries[i]) + end + local hdrs_vec = builder:EndVector(len) + + local id = generate_id() + local status = res.status + + http_resp_call_req.Start(builder) + http_resp_call_req.AddId(builder, id) + http_resp_call_req.AddStatus(builder, status) + http_resp_call_req.AddConfToken(builder, token) + http_resp_call_req.AddHeaders(builder, hdrs_vec) + + local req = http_resp_call_req.End(builder) + builder:Finish(req) + + local ok, err = send(sock, constants.RPC_HTTP_RESP_CALL, builder:Output()) + if not ok then + return nil, "failed to send RPC_HTTP_RESP_CALL: " .. err + end + + local ty, resp = receive(sock) + if ty == nil then + return nil, "failed to receive RPC_HTTP_RESP_CALL: " .. resp + end + + if ty ~= constants.RPC_HTTP_RESP_CALL then + return nil, "failed to receive RPC_HTTP_RESP_CALL: unexpected type " .. ty + end + + local buf = flatbuffers.binaryArray.New(resp) + local call_resp = http_resp_call_resp.GetRootAsResp(buf, 0) + local len = call_resp:HeadersLength() + if len > 0 then + local resp_headers = {} + for i = 1, len do + local entry = call_resp:Headers(i) + local name = str_lower(entry:Name()) + if not exclude_resp_header[name] then + if resp_headers[name] == nil then + core.response.set_header(name, entry:Value()) + resp_headers[name] = true + else + core.response.add_header(name, entry:Value()) + end + end + end + else + -- Filter out origin headeres + for k, v in pairs(res.headers) do + if not exclude_resp_header[str_lower(k)] then + core.response.set_header(k, v) + end + end + end + + local body + local len = call_resp:BodyLength() + if len > 0 then + -- TODO: support empty body + body = call_resp:BodyAsString() + end + local code = call_resp:Status() + core.log.info("recv resp, code: ", code, " body: ", body, " len: ", len) + + if code == 0 then + -- runner change body only, we should set code. Review Comment: ```suggestion -- runner changes body only, we should set code. ``` ########## apisix/plugins/ext-plugin-post-resp.lua: ########## @@ -0,0 +1,168 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- +local core = require("apisix.core") +local ext = require("apisix.plugins.ext-plugin.init") +local constants = require("apisix.constants") +local http = require("resty.http") + +local ngx = ngx +local ngx_print = ngx.print +local ngx_flush = ngx.flush +local string = string +local str_sub = string.sub + + +local name = "ext-plugin-post-resp" +local _M = { + version = 0.1, + priority = -4000, + name = name, + schema = ext.schema, +} + + +local function include_req_headers(ctx) + -- TODO: handle proxy_set_header + return core.request.headers(ctx) +end + + +local function get_http_obj(ctx) + return http.new() +end + + +local function close(ctx, http_obj) + -- TODO: keepalive + local ok, err = http_obj:close() + if not ok then + core.log.error("close http object failed: ", err) + end +end + + +local function get_response(ctx, http_obj) + local ok, err = http_obj:connect({ + scheme = ctx.upstream_scheme, + host = ctx.picked_server.host, + port = ctx.picked_server.port, + }) + + if not ok then + return nil, err + end + -- TODO: set timeout + local uri, args + if ctx.var.upstream_uri == "" then + -- use original uri instead of rewritten one + uri = ctx.var.uri + else + uri = ctx.var.upstream_uri + + -- the rewritten one may contain new args + local index = core.string.find(uri, "?") + if index then + local raw_uri = uri + uri = str_sub(raw_uri, 1, index - 1) + args = str_sub(raw_uri, index + 1) + end + end + local params = { + path = uri, + query = args or ctx.var.args, + headers = include_req_headers(ctx), + method = core.request.get_method(), + } + + local body, err = core.request.get_body() + if err then + return nil, err + end + + if body then + params["body"] = body + end + + local res, err = http_obj:request(params) + if not res then + return nil, err + end + + return res, err +end + + +function _M.check_schema(conf) + return core.schema.check(_M.schema, conf) +end + + +function _M.before_proxy(conf, ctx) + local http_obj = get_http_obj(ctx) + local res, err = get_response(ctx, http_obj) + if not res or err then + core.log.error("failed to request: ", err or "") + close(ctx, http_obj) + return 502 + end + ctx.runner_ext_response = res + + core.log.info("response info, status: ", res.status) + core.log.info("response info, headers: ", core.json.delay_encode(res.headers)) + + local code, body = ext.communicate(conf, ctx, name, constants.RPC_HTTP_RESP_CALL) + if body then + -- if body the body is changed, the code will be setc + close(ctx, http_obj) + return code, body + end + core.log.info("ext-plugin will send response") + -- send origin response, status maybe changed. + ngx.status = code or res.status + + local reader = res.body_reader + repeat + local chunk, ok, read_err, print_err, flush_err + -- TODO: HEAD or 304 + chunk, read_err = reader() + if read_err then + core.log.error("read response failed: ", read_err) + close(ctx, http_obj) + return 502 Review Comment: I would recommend putting the loop into a function, so we can handle the err in the caller. BTW, we can't set a status code here as the output may be sent. ########## apisix/plugins/ext-plugin-post-resp.lua: ########## @@ -0,0 +1,168 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- +local core = require("apisix.core") +local ext = require("apisix.plugins.ext-plugin.init") +local constants = require("apisix.constants") +local http = require("resty.http") + +local ngx = ngx +local ngx_print = ngx.print +local ngx_flush = ngx.flush +local string = string +local str_sub = string.sub + + +local name = "ext-plugin-post-resp" +local _M = { + version = 0.1, + priority = -4000, + name = name, + schema = ext.schema, +} + + +local function include_req_headers(ctx) + -- TODO: handle proxy_set_header + return core.request.headers(ctx) +end + + +local function get_http_obj(ctx) + return http.new() +end + + +local function close(ctx, http_obj) + -- TODO: keepalive + local ok, err = http_obj:close() + if not ok then + core.log.error("close http object failed: ", err) + end +end + + +local function get_response(ctx, http_obj) + local ok, err = http_obj:connect({ + scheme = ctx.upstream_scheme, + host = ctx.picked_server.host, + port = ctx.picked_server.port, + }) + + if not ok then + return nil, err + end + -- TODO: set timeout + local uri, args + if ctx.var.upstream_uri == "" then + -- use original uri instead of rewritten one + uri = ctx.var.uri + else + uri = ctx.var.upstream_uri + + -- the rewritten one may contain new args + local index = core.string.find(uri, "?") + if index then + local raw_uri = uri + uri = str_sub(raw_uri, 1, index - 1) + args = str_sub(raw_uri, index + 1) + end + end + local params = { + path = uri, + query = args or ctx.var.args, + headers = include_req_headers(ctx), + method = core.request.get_method(), + } + + local body, err = core.request.get_body() + if err then + return nil, err + end + + if body then + params["body"] = body + end + + local res, err = http_obj:request(params) + if not res then + return nil, err + end + + return res, err +end + + +function _M.check_schema(conf) + return core.schema.check(_M.schema, conf) +end + + +function _M.before_proxy(conf, ctx) + local http_obj = get_http_obj(ctx) + local res, err = get_response(ctx, http_obj) + if not res or err then + core.log.error("failed to request: ", err or "") + close(ctx, http_obj) + return 502 + end + ctx.runner_ext_response = res + + core.log.info("response info, status: ", res.status) + core.log.info("response info, headers: ", core.json.delay_encode(res.headers)) + + local code, body = ext.communicate(conf, ctx, name, constants.RPC_HTTP_RESP_CALL) + if body then + -- if body the body is changed, the code will be setc Review Comment: Typo? ########## apisix/plugins/ext-plugin/init.lua: ########## @@ -682,6 +682,107 @@ local rpc_handlers = { return true end, + nil, -- ignore RPC_EXTRA_INFO, already processed during RPC_HTTP_REQ_CALL interaction + function (conf, ctx, sock, entry) + local lrucache_id = core.lrucache.plugin_ctx_id(ctx, entry) + local token, err = core.lrucache.plugin_ctx(lrucache, ctx, entry, rpc_call, + constants.RPC_PREPARE_CONF, conf, ctx, + lrucache_id) + if not token then + return nil, err + end + + builder:Clear() + local var = ctx.var + + local res = ctx.runner_ext_response + local textEntries = {} + local hdrs = res.headers + for key, val in pairs(hdrs) do + local ty = type(val) + if ty == "table" then + for _, v in ipairs(val) do + core.table.insert(textEntries, build_headers(var, builder, key, v)) + end + else + core.table.insert(textEntries, build_headers(var, builder, key, val)) + end + end + local len = #textEntries + http_resp_call_req.StartHeadersVector(builder, len) + for i = len, 1, -1 do + builder:PrependUOffsetTRelative(textEntries[i]) + end + local hdrs_vec = builder:EndVector(len) + + local id = generate_id() + local status = res.status + + http_resp_call_req.Start(builder) + http_resp_call_req.AddId(builder, id) + http_resp_call_req.AddStatus(builder, status) + http_resp_call_req.AddConfToken(builder, token) + http_resp_call_req.AddHeaders(builder, hdrs_vec) + + local req = http_resp_call_req.End(builder) + builder:Finish(req) + + local ok, err = send(sock, constants.RPC_HTTP_RESP_CALL, builder:Output()) + if not ok then + return nil, "failed to send RPC_HTTP_RESP_CALL: " .. err + end + + local ty, resp = receive(sock) + if ty == nil then + return nil, "failed to receive RPC_HTTP_RESP_CALL: " .. resp + end + + if ty ~= constants.RPC_HTTP_RESP_CALL then + return nil, "failed to receive RPC_HTTP_RESP_CALL: unexpected type " .. ty + end + + local buf = flatbuffers.binaryArray.New(resp) + local call_resp = http_resp_call_resp.GetRootAsResp(buf, 0) + local len = call_resp:HeadersLength() + if len > 0 then + local resp_headers = {} + for i = 1, len do + local entry = call_resp:Headers(i) + local name = str_lower(entry:Name()) + if not exclude_resp_header[name] then + if resp_headers[name] == nil then + core.response.set_header(name, entry:Value()) + resp_headers[name] = true + else + core.response.add_header(name, entry:Value()) + end + end + end + else + -- Filter out origin headeres + for k, v in pairs(res.headers) do + if not exclude_resp_header[str_lower(k)] then + core.response.set_header(k, v) + end + end + end + + local body + local len = call_resp:BodyLength() + if len > 0 then + -- TODO: support empty body + body = call_resp:BodyAsString() + end + local code = call_resp:Status() + core.log.info("recv resp, code: ", code, " body: ", body, " len: ", len) + + if code == 0 then + -- runner change body only, we should set code. + code = body and res.status or nil Review Comment: Look like it's covered in `ngx.status = code or res.status`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
