Alberto, But isn't that exactly what Trident is supposed to do?
http://storm.apache.org/releases/current/Trident-state.html After reading more about this subject, it seems Trident gives me the exactly once semantics for free: You've seen the intricacies of what it takes to achieve exactly-once > semantics. The nice thing about Trident is that it internalizes all the > fault-tolerance logic within the State – as a user you don't have to deal > with comparing txids, storing multiple values in the database, or anything > like that. > >From that I understand that Trident will take care of it all for me, storing just the txid somewhere for each tuple, as described as an example here: You've already seen that storing just the count as the value isn't > sufficient to know whether you've processed a batch of tuples before. > Instead, what you can do is store the transaction id with the count in the > database as an atomic value. Then, when updating the count, you can just > compare the transaction id in the database with the transaction id for the > current batch. If they're the same, you skip the update – because of the > strong ordering, you know for sure that the value in the database > incorporates the current batch. If they're different, you increment the > count. This logic works because the batch for a txid never changes, and > Trident ensures that state updates are ordered among batches. > I'm wondering though where does Trident these internal txids. Would it be in memory? Or Zookeeper, maybe? Regards, Francisco On Thu, Jun 9, 2016 at 6:23 AM, Alberto São Marcos <[email protected]> wrote: > Trying to do "exactly once" in distributed systems is not the easiest or > safest path Francisco. Making the downstream workflow idempotent is usually > a lot easier and prolly a much robust solution. > > On Wed, Jun 8, 2016 at 3:02 PM, Francisco Lopes <[email protected]> > wrote: > >> Hello, >> >> I'm new to Storm/Trident and I'm using it to read messages from Kafka and >> send them to an external API exactly once. My topology is as simple as: >> >> IPartitionedTridentSpout kafkaSpout = getKafkaSpout(); >> TridentTopology topology = new TridentTopology(); >> >> topology >> .newStream("kafka", kafkaSpout) >> .each(new Fields("str"), new Processor(), new Fields()); >> >> I'm not sure how I should implement state to guarantee a message is not >> processed twice. >> >> Can anyone please enlighten me? >> >> All the examples I found show states as counts or sums and that's not >> what I really need. I'm inclined to use a Redis instance to store state ( >> https://github.com/kstyrc/trident-redis), but I don't know on what I'd >> actually persistentAggregate. >> >> Thank you. >> >> Regards, >> Francisco >> > >
