[GitHub] [apisix] bisakhmondal commented on a change in pull request #6203: feat(batchprocessor): support partial consumption of batch entries

2022-01-26 Thread GitBox


bisakhmondal commented on a change in pull request #6203:
URL: https://github.com/apache/apisix/pull/6203#discussion_r793275311



##
File path: apisix/plugins/datadog.lua
##
@@ -108,127 +109,144 @@ local function generate_tag(entry, const_tags)
 end
 
 
-function _M.log(conf, ctx)
-local entry = fetch_log(ngx, {})
-entry.balancer_ip = ctx.balancer_ip or ""
-entry.scheme = ctx.upstream_scheme or ""
+local function send_metric_over_udp(entry, metadata)
+local err_msg
+local sock = udp()
+local host, port = metadata.value.host, metadata.value.port
 
--- if prefer_name is set, fetch the service/route name. If the name is 
nil, fall back to id.
-if conf.prefer_name then
-if entry.service_id and entry.service_id ~= "" then
-local svc = service_fetch(entry.service_id)
+local ok, err = sock:setpeername(host, port)
+if not ok then
+return false, "failed to connect to UDP server: host[" .. host
+   .. "] port[" .. tostring(port) .. "] err: " .. err
+end
 
-if svc and svc.value.name ~= "" then
-entry.service_id =  svc.value.name
-end
-end
+-- Generate prefix & suffix according dogstatsd udp data format.
+local suffix = generate_tag(entry, metadata.value.constant_tags)
+local prefix = metadata.value.namespace
+if prefix ~= "" then
+prefix = prefix .. "."
+end
 
-if ctx.route_name and ctx.route_name ~= "" then
-entry.route_id = ctx.route_name
-end
+-- request counter
+ok, err = sock:send(format("%s:%s|%s%s", prefix ..
+   "request.counter", 1, "c", suffix))

Review comment:
   Fixed




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@apisix.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [apisix] bisakhmondal commented on a change in pull request #6203: feat(batchprocessor): support partial consumption of batch entries

2022-01-26 Thread GitBox


bisakhmondal commented on a change in pull request #6203:
URL: https://github.com/apache/apisix/pull/6203#discussion_r792501257



##
File path: apisix/utils/batch-processor.lua
##
@@ -63,17 +63,36 @@ local function set_metrics(self, count)
 end
 
 
+local function slice_batch(batch, n)
+local slice = {}
+for i = n or 1, #batch, 1 do
+slice[#slice+1] = batch[i]
+end
+return slice
+end
+
+
 function execute_func(premature, self, batch)
 if premature then
 return
 end
 
-local ok, err = self.func(batch.entries, self.batch_max_size)
+--- In case of "err" and a valid "n" batch processor considers, all n-1 
entries have been
+--- successfully consumed and hence reschedule the job for entries with 
index n to #entries
+--- based on the current retry policy.
+local ok, err, n = self.func(batch.entries, self.batch_max_size)
 if not ok then
-core.log.error("Batch Processor[", self.name,
-   "] failed to process entries: ", err)
+if n then
+core.log.error("Batch Processor[", self.name, "] failed to process 
entries [",
+#batch.entries + 1 - (n or -1), "/", 
#batch.entries ,"]: ", err)

Review comment:
   Oh damn, got your point. It's the if block, haha




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@apisix.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [apisix] bisakhmondal commented on a change in pull request #6203: feat(batchprocessor): support partial consumption of batch entries

2022-01-26 Thread GitBox


bisakhmondal commented on a change in pull request #6203:
URL: https://github.com/apache/apisix/pull/6203#discussion_r792500662



##
File path: apisix/utils/batch-processor.lua
##
@@ -63,17 +63,36 @@ local function set_metrics(self, count)
 end
 
 
+local function slice_batch(batch, n)
+local slice = {}
+for i = n or 1, #batch, 1 do
+slice[#slice+1] = batch[i]
+end
+return slice
+end
+
+
 function execute_func(premature, self, batch)
 if premature then
 return
 end
 
-local ok, err = self.func(batch.entries, self.batch_max_size)
+--- In case of "err" and a valid "n" batch processor considers, all n-1 
entries have been
+--- successfully consumed and hence reschedule the job for entries with 
index n to #entries
+--- based on the current retry policy.
+local ok, err, n = self.func(batch.entries, self.batch_max_size)
 if not ok then
-core.log.error("Batch Processor[", self.name,
-   "] failed to process entries: ", err)
+if n then
+core.log.error("Batch Processor[", self.name, "] failed to process 
entries [",
+#batch.entries + 1 - (n or -1), "/", 
#batch.entries ,"]: ", err)

Review comment:
   For it not to be nil, we have to update all existing instances of 
consumers. also, it's kind of enforcing the future implementation to return 
something like `false, err, 1` when the consumers consume the entries in one go 
and hit an error. Do we really need that sort of complication? I think it's 
better to leave that part to the consumers themselves like if they want to use 
the subtle feature or not. What do you say?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@apisix.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [apisix] bisakhmondal commented on a change in pull request #6203: feat(batchprocessor): support partial consumption of batch entries

2022-01-26 Thread GitBox


bisakhmondal commented on a change in pull request #6203:
URL: https://github.com/apache/apisix/pull/6203#discussion_r792496732



##
File path: apisix/utils/batch-processor.lua
##
@@ -63,17 +63,36 @@ local function set_metrics(self, count)
 end
 
 
+local function slice_batch(batch, n)
+local slice = {}
+for i = n or 1, #batch, 1 do
+slice[#slice+1] = batch[i]

Review comment:
   Oh, I see. I thought len is O(1) operation but it seems, during each 
invocation it gets recomputed at O(logn).
   Nice nit picking :+1: 

##
File path: apisix/utils/batch-processor.lua
##
@@ -63,17 +63,36 @@ local function set_metrics(self, count)
 end
 
 
+local function slice_batch(batch, n)
+local slice = {}
+for i = n or 1, #batch, 1 do
+slice[#slice+1] = batch[i]
+end
+return slice
+end
+
+
 function execute_func(premature, self, batch)
 if premature then
 return
 end
 
-local ok, err = self.func(batch.entries, self.batch_max_size)
+--- In case of "err" and a valid "n" batch processor considers, all n-1 
entries have been

Review comment:
   Yes, defenitely




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@apisix.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org