karthi5634 commented on issue #12394:
URL: https://github.com/apache/apisix/issues/12394#issuecomment-3026157896

   This is the issue I am facing while using Kafka-logger and as per your 
suggestion I have created custom kafka apisix plugin.
   Below is the issue I have raised for kafka-logger where JSON data is not 
processing properly.
   Please refer:
   https://github.com/apache/apisix/issues/12201
   
   **Custom Kafka Plugin code :**
   local core = require("apisix.core")
   local producer = require("resty.kafka.producer")
   
   local schema = {
       type = "object",
       properties = {
           broker_list = {
               type = "object",
               minProperties = 1
           },
           topic = { type = "string" },
           producer_type = { type = "string", enum = { "sync", "async" } }
       },
       required = { "broker_list", "topic" }
   }
   
   local plugin_name = "kafka-json-producer"
   
   local _M = {
       version = 0.8,
       priority = 10,
       name = plugin_name,
       schema = schema,
   }
   
   function _M.access(conf, ctx)
       local raw_body = core.request.get_body()
   
       if not raw_body then
           core.log.error("❌ No request body found")
           return 400, { message = "Missing request body" }
       end
   
       local kafka_producer = producer:new(conf.broker_list, {
           producer_type = conf.producer_type or "sync",
           required_acks = 1,
           request_timeout = 2000
       })
   
       -- Manually fetch and inject metadata if needed
       local metadata, err = kafka_producer.client:fetch_metadata(conf.topic)
       if err then
           core.log.error("❌ Failed to fetch Kafka metadata: ", err)
       else
           core.log.warn("📦 Raw Kafka metadata: ", core.json.encode(metadata))
   
           -- Force inject brokers from metadata
           if metadata and metadata.brokers then
               for _, broker in ipairs(metadata.brokers) do
                   local key = broker.host .. ":" .. broker.port
                   kafka_producer.client.brokers[key] = {
                       host = broker.host,
                       port = broker.port,
                       socket_config = {},
                       version = kafka_producer.client.version
                   }
               end
           end
   
           -- Confirm what's now stored in client
           core.log.warn("💡 Injected Kafka brokers: ", 
core.json.encode(kafka_producer.client.brokers))
       end
   
       -- fallback to partition 0
       local ok, send_err = kafka_producer:send({
           topic = conf.topic,
           partition_id = 0
       }, raw_body)
   
       if not ok then
           core.log.error("❌ Kafka send failed: ", send_err)
           return 500, { message = "Kafka send failed", error = send_err }
       end
   
       core.log.info("✅ Kafka message sent to topic: ", conf.topic)
       return 200, { message = "Kafka send success" }
   end
   
   return _M
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@apisix.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to