Hello,

I have implemented a heka pipeline, which has a input, decoder, filter, output.

In the log file if there are two entities related to two customers.
(name, source, event, destination)

1. john, 09400000900, reg1 , 09400000800

2. sam,  09400000901, reg2 , 09400000801


When these go through the pipeline they are written to a output file
again. (pipeline and sandbox codes are below).

At the output log event it should show;

09400000800 0

09400000901 1

It doesn't always show in that order. When i start the pipeline again
and again each time the order of the two events are changing even
though it is the same log file. (i remove sandbox preservation and log
streamer directories)

Eg: Sometimes it shows as following even though the only thing done is
restarting the pipeline.

09400000901 0

09400000800 1

Is there any race condition happening somewhere between the filter and
output plug-ins ? Any help would be much appreciated

*heka.toml*

# input and decoder are here #

[event-filter]
type = "SandboxFilter"
message_matcher = "Fields[type] == 'event_decode'"
filename = "lua/counter/consumer.lua"
preserve_data = true
memory_limit = 20000000

[mongo_summary_output]
type = "SandboxOutput"
message_matcher =  "Fields[type] == 'summary_aggregated'"
filename = "lua/output/summary-output.lua"

*Filter*

    local message = {
        Payload = nil,
        Fields = {}
    }

function process_message()

    message.Fields = {}

    if read_message("Fields[event-type]") == "reg1" then

       message.Fields["consumer"] = read_message("Fields[destination-address]")

    elseif read_message("Fields[event-type]") == "reg2" then

      message.Fields["consumer"] = read_message("Fields[source-address]")

end

    message.Fields.type = "summary_aggregated"
    if not pcall(inject_message, message) then return -1 end

    return 0
end


*Output*

local count = 0

local buffer_file = "/home/eshan/buffer.log"

function process_message()

    local  outputbatch = read_message("Fields[consumer]") .. " " .. count

    local backup_file, e = io.open(buffer_file, "a+")
    backup_file:write(outputbatch)
    backup_file:close()
    count = count +1

end

Regards,

Udara Chathuranga
_______________________________________________
Heka mailing list
[email protected]
https://mail.mozilla.org/listinfo/heka

Reply via email to