Hrm. I'm not sure why you'd see the output in a different order. You definitely 
shouldn't be; Heka does guarantee that data from a single stream will be 
processed in the order received, and AFAICT there's nothing you've done here 
that would impact the order of the output.

I notice that the `process_message` function in your output is missing a return 
code, which is weird, because I'd expect that to not work at all.

One thing you might try as a debugging step is to use a FileOutput instead of a 
SandboxOutput to see if that gives you any better results. You'll need a 
SandboxEncoder to insert the count into your output. If that gives you 
consistent ordering, then we've at least narrowed the problem down.

-r


On 12/23/2015 12:26 AM, Udara Chathuranga wrote:
Hello,

I have implemented a heka pipeline, which has a input, decoder, filter, output.

In the log file if there are two entities related to two customers. (name, 
source, event, destination)

1. john,09400000900, reg1 , 09400000800

2. sam,09400000901, reg2 , 09400000801


When these go through the pipeline they are written to a output file again. 
(pipeline and sandbox codes are below).

At the output log event it should show;

094000008000

094000009011

It doesn't always show in that order. When i start the pipeline again
and again each time the order of the two events are changing even though
it is the same log file. (i remove sandbox preservation and log streamer
directories)

Eg: Sometimes it shows as following even though the only thing done is 
restarting the pipeline.

094000009010

094000008001

Is there any race condition happening somewhere between the filter and output 
plug-ins ? Any help would be much appreciated

_heka.toml_

# input and decoder are here #

[event-filter]
type = "SandboxFilter"
message_matcher = "Fields[type] == 'event_decode'"
filename = "lua/counter/consumer.lua"
preserve_data = true
memory_limit = 20000000

[mongo_summary_output]
type = "SandboxOutput"
message_matcher =  "Fields[type] == 'summary_aggregated'"
filename = "lua/output/summary-output.lua"

_Filter_

     local message = {
         Payload =nil,
         Fields = {}
     }

function process_message()

message.Fields = {}

if read_message("Fields[event-type]") == "reg1" then

message.Fields["consumer"] = read_message("Fields[destination-address]")

elseif read_message("Fields[event-type]") == "reg2" then

**message.Fields["consumer"] = read_message("Fields[source-address]")

end

     message.Fields.type ="summary_aggregated"
if not pcall(inject_message,message)then return -1 end

return 0
end


_Output_

local count =0

local buffer_file ="/home/eshan/buffer.log"

function process_message()

     localoutputbatch = read_message("Fields[consumer]") .. " " .. count

local backup_file,e =io.open(buffer_file,"a+")
backup_file:write(outputbatch)
backup_file:close()
count =count +1

end

Regards,

Udara Chathuranga



_______________________________________________
Heka mailing list
[email protected]
https://mail.mozilla.org/listinfo/heka


_______________________________________________
Heka mailing list
[email protected]
https://mail.mozilla.org/listinfo/heka

Reply via email to