Thanks for the suggestion, Didier, but I was unable to find a way to make 
pmap work for my use case. For those interested, here's what I came up 
with, then some  questions:

(defn parallel-per
  "Handle records from input-chan in parallel, but records with matching 
`splitter` return values serially."
  [splitter handler input-chan]
  (let [blockers (atom {}) ;; map of group-key to [chan num-remaining]
        status-chan (async/chan)]
    (async/go-loop []
      (let [[val port] (async/alts! [input-chan status-chan])]
        (if (= port input-chan)
          (if (some? val)
            (let [group-key (splitter val)]
              (if-let [blocker (get @blockers group-key)]
                (let [[blocker-chan ^long num-remaining] blocker
                      next-blocker-chan (async/chan)]
                  (swap! blockers assoc group-key [next-blocker-chan (inc 
num-remaining)])
                  (async/go
                    (async/<! blocker-chan)
                    (handler val)
                    (async/put! status-chan group-key)
                    (async/close! next-blocker-chan))
                  (recur))
                (let [blocker-chan (async/chan)]
                  (swap! blockers assoc group-key [blocker-chan 1])
                  (async/go
                    (handler val)
                    (async/put! status-chan group-key)
                    (async/close! blocker-chan))
                  (recur))))
            (async/close! status-chan))
          (let [group-key val
                [_ ^long num-remaining] (get @blockers group-key)]
            (if (> num-remaining 1)
              (do
                (swap! blockers update-in [group-key 1] dec)
                (recur))
              (do
                (swap! blockers dissoc group-key)
                (recur)))))))
    nil))

Does anything in here look bad? Is there a way to gracefully handle 
input-chan closing without using loop/recur? I'm using a new channel for 
every new record to block the next record with the same `splitter` return 
value - my first approach used one channel per distinct `splitter` value, 
but I saw some results printed out of order. Does this mean that the order 
of takes from <! is not deterministic, or that I have/had a bug?

-- 
You received this message because you are subscribed to the Google
Groups "Clojure" group.
To post to this group, send email to clojure@googlegroups.com
Note that posts from new members are moderated - please be patient with your 
first post.
To unsubscribe from this group, send email to
clojure+unsubscr...@googlegroups.com
For more options, visit this group at
http://groups.google.com/group/clojure?hl=en
--- 
You received this message because you are subscribed to the Google Groups 
"Clojure" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to clojure+unsubscr...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Reply via email to