Baoyuantop commented on issue #12394: URL: https://github.com/apache/apisix/issues/12394#issuecomment-3051581474
> [kafka-json-producer.txt](https://github.com/user-attachments/files/21121655/kafka-json-producer.txt) > > hi [@Baoyuantop](https://github.com/Baoyuantop) I was not able to upload .lua extension files. PFA updated custom kafka plugin code and let me know. It is matching with kafka-logger plugin code. Just pasting the code here for easy viewing: ```lua local core = require("apisix.core") local producer = require("resty.kafka.producer") local schema = { type = "object", properties = { broker_list = { type = "object", minProperties = 1 }, topic = { type = "string" }, producer_type = { type = "string", enum = { "sync", "async" } } }, required = { "broker_list", "topic" } } local plugin_name = "kafka-json-producer" local _M = { version = 0.8, priority = 10, name = plugin_name, schema = schema, } function _M.access(conf, ctx) local raw_body = core.request.get_body() if not raw_body then core.log.error("❌ No request body found") return 400, { message = "Missing request body" } end local kafka_producer = producer:new(conf.broker_list, { producer_type = conf.producer_type or "sync", required_acks = 1, request_timeout = 2000 }) -- Manually fetch and inject metadata if needed local metadata, err = kafka_producer.client:fetch_metadata(conf.topic) if err then core.log.error("❌ Failed to fetch Kafka metadata: ", err) else core.log.warn("📦 Raw Kafka metadata: ", core.json.encode(metadata)) -- Force inject brokers from metadata if metadata and metadata.brokers then for _, broker in ipairs(metadata.brokers) do local key = broker.host .. ":" .. broker.port kafka_producer.client.brokers[key] = { host = broker.host, port = broker.port, socket_config = {}, version = kafka_producer.client.version } end end -- Confirm what's now stored in client core.log.warn("💡 Injected Kafka brokers: ", core.json.encode(kafka_producer.client.brokers)) end -- fallback to partition 0 local ok, send_err = kafka_producer:send({ topic = conf.topic, partition_id = 0 }, raw_body) if not ok then core.log.error("❌ Kafka send failed: ", send_err) return 500, { message = "Kafka send failed", error = send_err } end core.log.info("✅ Kafka message sent to topic: ", conf.topic) return 200, { message = "Kafka send success" } end return _M ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
