cool, the Apache Kafka plugin is really what we want. Looking forward to this PR.
Thanks, Ming Wen, Apache APISIX Twitter: _WenMing 蔡兴 <[email protected]> 于2020年1月7日周二 上午11:02写道: > Hi, all, > I want to add a log plugin which produce log through kafka > > The general information is as follows: > 1. Collect log information > 2. Get kafka config > 3. Send kafka msg > > > A brief code presentation: > > ``` > local producer = require "resty.kafka.producer" > > function _M.log(conf, ctx) > local consume = ctx.var.upstream_response_time or 0 > local par = ngx.encode_args(req.params()) > if string.len(par) > 1000 then > par = string.sub(par,1,1000) .. '...' > end > > local log_json = {} > log_json["entry"] = ctx.entry > log_json["uid"] = ctx.request_uid or 2 > log_json["username"] = ctx.request_username or "odin_api" > log_json["user_ip"] = ctx.var.remote_addr > log_json["url"] = ctx.var.scheme .. '://' .. ctx.var.host .. ':' .. > ctx.var.server_port .. ctx.var.uri > log_json["route"] = ctx.var.uri > log_json["host"] = ctx.var.upstream_host > log_json["method"] = ngx.req.get_method() > log_json["params"] = par > log_json["consume"] = ctx.var.request_time * 1000 > log_json["upstream_consume"] = consume * 1000 > log_json["status"] = ngx.status > log_json["logtime"] = time.now() > --- you can log more info > > > --- get kafka confg > local local_conf, err = fetch_local_conf() > if not local_conf then > return nil, nil, err > end > > local opts = clone_tab(local_conf.kafka) > local kafka_topic = opts.topic > local broker_host = opts.broker_list_host > local broker_port = opts.broker_list_port > > local topic = kafka_topic > local broker_list = { > { host = broker_host, port = broker_port }, > } > > -- kafka producer > local p = producer:new(broker_list, { producer_type = "async" }) > > --- send msg > local offset, err = p:send(topic, nil, message) > if not offset then > return > end > > ``` > > > > > >
