Thanks for the suggestion, Didier, but I was unable to find a way to make pmap work for my use case. For those interested, here's what I came up with, then some questions:
(defn parallel-per "Handle records from input-chan in parallel, but records with matching `splitter` return values serially." [splitter handler input-chan] (let [blockers (atom {}) ;; map of group-key to [chan num-remaining] status-chan (async/chan)] (async/go-loop [] (let [[val port] (async/alts! [input-chan status-chan])] (if (= port input-chan) (if (some? val) (let [group-key (splitter val)] (if-let [blocker (get @blockers group-key)] (let [[blocker-chan ^long num-remaining] blocker next-blocker-chan (async/chan)] (swap! blockers assoc group-key [next-blocker-chan (inc num-remaining)]) (async/go (async/<! blocker-chan) (handler val) (async/put! status-chan group-key) (async/close! next-blocker-chan)) (recur)) (let [blocker-chan (async/chan)] (swap! blockers assoc group-key [blocker-chan 1]) (async/go (handler val) (async/put! status-chan group-key) (async/close! blocker-chan)) (recur)))) (async/close! status-chan)) (let [group-key val [_ ^long num-remaining] (get @blockers group-key)] (if (> num-remaining 1) (do (swap! blockers update-in [group-key 1] dec) (recur)) (do (swap! blockers dissoc group-key) (recur))))))) nil)) Does anything in here look bad? Is there a way to gracefully handle input-chan closing without using loop/recur? I'm using a new channel for every new record to block the next record with the same `splitter` return value - my first approach used one channel per distinct `splitter` value, but I saw some results printed out of order. Does this mean that the order of takes from <! is not deterministic, or that I have/had a bug? -- You received this message because you are subscribed to the Google Groups "Clojure" group. To post to this group, send email to clojure@googlegroups.com Note that posts from new members are moderated - please be patient with your first post. To unsubscribe from this group, send email to clojure+unsubscr...@googlegroups.com For more options, visit this group at http://groups.google.com/group/clojure?hl=en --- You received this message because you are subscribed to the Google Groups "Clojure" group. To unsubscribe from this group and stop receiving emails from it, send an email to clojure+unsubscr...@googlegroups.com. For more options, visit https://groups.google.com/d/optout.