Here: (ns dda.test)
(def test-infinite-lazy-seq (repeatedly (fn [] {:id (rand-int 2) :val (rand-int 10)}))) (def test-finite-seq [{:id 1 :val 1} {:id 1 :val 2} {:id 3 :val 1}]) (defn parallel-per [k seqf ls] (pmap #(map seqf %) (vals (group-by #(k %) ls)))) (defn get-next [x] "Your code to call Kinesis for the next x item would be here." (take x test-infinite-lazy-seq)) (def processed (atom [])) (dotimes [_ 3] ; This would be a doseq instead, or whatever you need it to be (swap! processed #(concat % (parallel-per :id (fn [m] (update m :val inc)) (get-next 20))))) (parallel-per :id (fn [m] (update m :val inc)) test-finite-seq) This is what you would do if you wanted to "chunk" it. You'd just use group-by instead of partition-by. The difference is that parallel-per would lose the ability to process infinite sequences, as it is now mostly eager, because group-by is eager. So you'd have to call it in some loop where each time to pass it the next chunk to parallel-per process. On Tuesday, 20 June 2017 19:28:04 UTC-7, Didier wrote: > > Do you want something like this? > > (ns dda.test) > > (def test-infinite-lazy-seq (repeatedly > (fn [] {:id (rand-int 2) > :val (rand-int 10)}))) > > (def test-finite-seq [{:id 1 :val 1} > {:id 1 :val 2} > {:id 3 :val 1}]) > > (defn parallel-per > [k seqf ls] > (pmap #(map seqf %) (partition-by #(k %) ls))) > > (take 10 (parallel-per :id > (fn [m] (update m :val inc)) > test-infinite-lazy-seq)) > > (parallel-per :id > (fn [m] (update m :val inc)) > test-finite-seq) > > > It handles your simple example, and can also handle infinite sequences > lazily, since I assumed your Kinesis stream would be infinite and you want > to process things as they come through. > > Now this only parallelize groups that come through back to back. It is not > possible to do a group by ":id" on an infinite sequence, so the only thing > you could do better then this would be to chunk. So you could take in batch > of 100 from the stream, then group-by on it, and parallelize each groups. I > can try to write a solution for that too if you want. > > > On Tuesday, 20 June 2017 11:57:59 UTC-7, Tom Connors wrote: >> >> Great, I'll watch that video. Thanks again. >> > -- You received this message because you are subscribed to the Google Groups "Clojure" group. To post to this group, send email to clojure@googlegroups.com Note that posts from new members are moderated - please be patient with your first post. To unsubscribe from this group, send email to clojure+unsubscr...@googlegroups.com For more options, visit this group at http://groups.google.com/group/clojure?hl=en --- You received this message because you are subscribed to the Google Groups "Clojure" group. To unsubscribe from this group and stop receiving emails from it, send an email to clojure+unsubscr...@googlegroups.com. For more options, visit https://groups.google.com/d/optout.