Perhaps this has already been suggested. I have not been following the thread that closely but you could approach it this way:
1. Enrichment parsers consume immediately from the queue. 2. Parsers using an enrichment inspect the first message in the queue. If it is less than <delay> minutes old, the parser waits to consume it. After the delay the parser consumes the message. Thanks Carolyn On 11/4/16, 10:50 AM, "Nick Allen" <n...@nickallen.org> wrote: >> >> A sleep/wait cycle is another way to do this (simply delay everything in >> xyz topology by 30 seconds) which isn't as nice, but is also probably way >> less complicated to implement. > > >It definitely has the advantage of being simpler to implement. But doing a >sleep in a message passing architecture like Storm, doesn't feel right to >me. Just my gut. > > > > >On Fri, Nov 4, 2016 at 10:43 AM, zeo...@gmail.com <zeo...@gmail.com> wrote: > >> I think we've come to a better way to do this which is sort of a >> waitUntil(exists || timeout), but the issue is checking if something exists >> because it requires some sort of timestamp to avoid collisions (due to >> source port reuse, etc.). I don't know the best way to do this offhand. >> Here's a general scenario: >> >> 1) ssh syslog comes in -> parses -> insert to HBase {ip_login_src, >> src_port, ip_login_dst, ip_login_dst_hostname, account, timestamp, >> success_bool} via streaming enrichment >> >> 2) Network logs come in saying ip_src_addr logged into ip_dst_addr -> >> parses -> enriches (checks for whitelists, then if appropriate sets >> is_alert = T) -> indexes >> >> What I want is something for the network logs to get enriched with the ssh >> hbase data (almost exactly this use case >> <https://cwiki.apache.org/confluence/display/METRON/ >> 2016/06/16/Metron+Tutorial+-+Fundamentals+Part+6%3A+Streaming+Enrichment >> >), >> using ip_src_addr, src_port, ip_dst_addr, account, and maybe some sort of >> fuzzy timestamp? Then we can hash them all together and use it as a lookup >> key, but not sure how to handle timestamps without having 3 identifiers (1 >> for current time +- 3 mins, 1 for previous 3 minute segment, one for future >> 3 minute segment). >> >> >> A sleep/wait cycle is another way to do this (simply delay everything in >> xyz topology by 30 seconds) which isn't as nice, but is also probably way >> less complicated to implement. >> >> We're discussing this in IRC (soon to be slack ^.^) as well. >> >> Jon >> >> On Fri, Nov 4, 2016 at 10:28 AM Otto Fowler <ottobackwa...@gmail.com> >> wrote: >> >> So spout orchestration/gating? >> >> Spout checks for external state flag >> >> if CURRENT - process >> if UPDATING - wait >> >> With the ingesting agent sets flag to updating when running? >> >> >> On November 4, 2016 at 09:29:16, zeo...@gmail.com (zeo...@gmail.com) >> wrote: >> >> Is there a good method (i.e. something using Stellar/ZK) to implement an >> intentional processing delay to all tuples in a specific topology? I plan >> >> to do some custom enrichments, but the data used to do the enrichment *may* >> be >> >> ingested at roughly the same time the data to be enriched is (it also may >> not ever be sent). So I'd like to add a delay in my cluster that applies >> to certain parser topologies. >> >> I took a look around in the documentation and in JIRA and didn't find >> anything available or being worked on, but I did see that this may conflict >> with METRON-322. Essentially what I'm considering is a {sleep,delay,wait} >> stellar function, but it could also be a delay in a parser's kafka spout >> (much less of a fan of the second option). >> >> I'm looking for feedback on the best way to approach this, and I'd be happy >> to do the work myself (if necessary) when it gets to that point. I did >> consider implementing this delay upstream (in the sensor itself), but after >> looking in more detail it doesn't seem as feasible. >> >> Jon >> -- >> >> Jon >> >> -- >> >> Jon >> >> Sent from my mobile device >> > > > >-- >Nick Allen <n...@nickallen.org>