karthi5634 commented on issue #12394: URL: https://github.com/apache/apisix/issues/12394#issuecomment-3026157896
This is the issue I am facing while using Kafka-logger and as per your suggestion I have created custom kafka apisix plugin. Below is the issue I have raised for kafka-logger where JSON data is not processing properly. Please refer: https://github.com/apache/apisix/issues/12201 **Custom Kafka Plugin code :** local core = require("apisix.core") local producer = require("resty.kafka.producer") local schema = { type = "object", properties = { broker_list = { type = "object", minProperties = 1 }, topic = { type = "string" }, producer_type = { type = "string", enum = { "sync", "async" } } }, required = { "broker_list", "topic" } } local plugin_name = "kafka-json-producer" local _M = { version = 0.8, priority = 10, name = plugin_name, schema = schema, } function _M.access(conf, ctx) local raw_body = core.request.get_body() if not raw_body then core.log.error("❌ No request body found") return 400, { message = "Missing request body" } end local kafka_producer = producer:new(conf.broker_list, { producer_type = conf.producer_type or "sync", required_acks = 1, request_timeout = 2000 }) -- Manually fetch and inject metadata if needed local metadata, err = kafka_producer.client:fetch_metadata(conf.topic) if err then core.log.error("❌ Failed to fetch Kafka metadata: ", err) else core.log.warn("📦 Raw Kafka metadata: ", core.json.encode(metadata)) -- Force inject brokers from metadata if metadata and metadata.brokers then for _, broker in ipairs(metadata.brokers) do local key = broker.host .. ":" .. broker.port kafka_producer.client.brokers[key] = { host = broker.host, port = broker.port, socket_config = {}, version = kafka_producer.client.version } end end -- Confirm what's now stored in client core.log.warn("💡 Injected Kafka brokers: ", core.json.encode(kafka_producer.client.brokers)) end -- fallback to partition 0 local ok, send_err = kafka_producer:send({ topic = conf.topic, partition_id = 0 }, raw_body) if not ok then core.log.error("❌ Kafka send failed: ", send_err) return 500, { message = "Kafka send failed", error = send_err } end core.log.info("✅ Kafka message sent to topic: ", conf.topic) return 200, { message = "Kafka send success" } end return _M -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@apisix.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org