Here:

(ns dda.test)

(def test-infinite-lazy-seq (repeatedly
                             (fn [] {:id (rand-int 2)
                                     :val (rand-int 10)})))

(def test-finite-seq [{:id 1 :val 1}
                      {:id 1 :val 2}
                      {:id 3 :val 1}])

(defn parallel-per
  [k seqf ls]
  (pmap #(map seqf %) (vals (group-by #(k %) ls))))

(defn get-next [x]
  "Your code to call Kinesis for the next x item would be here."
  (take x test-infinite-lazy-seq))

(def processed (atom []))
(dotimes [_ 3] ; This would be a doseq instead, or whatever you need it to 
be
  (swap! processed
         #(concat % (parallel-per :id
                                  (fn [m] (update m :val inc))
                                  (get-next 20)))))

(parallel-per :id
              (fn [m] (update m :val inc))
              test-finite-seq)


This is what you would do if you wanted to "chunk" it. You'd just use 
group-by instead of partition-by. The difference is that parallel-per would 
lose the ability to process infinite sequences, as it is now mostly eager, 
because group-by is eager. So you'd have to call it in some loop where each 
time to pass it the next chunk to parallel-per process.


On Tuesday, 20 June 2017 19:28:04 UTC-7, Didier wrote:
>
> Do you want something like this?
>
> (ns dda.test)
>
> (def test-infinite-lazy-seq (repeatedly
>                              (fn [] {:id (rand-int 2)
>                                      :val (rand-int 10)})))
>
> (def test-finite-seq [{:id 1 :val 1}
>                       {:id 1 :val 2}
>                       {:id 3 :val 1}])
>
> (defn parallel-per
>   [k seqf ls]
>   (pmap #(map seqf %) (partition-by #(k %) ls)))
>
> (take 10 (parallel-per :id
>                        (fn [m] (update m :val inc))
>                        test-infinite-lazy-seq))
>
> (parallel-per :id
>               (fn [m] (update m :val inc))
>               test-finite-seq)
>
>
> It handles your simple example, and can also handle infinite sequences 
> lazily, since I assumed your Kinesis stream would be infinite and you want 
> to process things as they come through.
>
> Now this only parallelize groups that come through back to back. It is not 
> possible to do a group by ":id" on an infinite sequence, so the only thing 
> you could do better then this would be to chunk. So you could take in batch 
> of 100 from the stream, then group-by on it, and parallelize each groups. I 
> can try to write a solution for that too if you want.
>
>
> On Tuesday, 20 June 2017 11:57:59 UTC-7, Tom Connors wrote:
>>
>> Great, I'll watch that video. Thanks again.
>>
>

-- 
You received this message because you are subscribed to the Google
Groups "Clojure" group.
To post to this group, send email to clojure@googlegroups.com
Note that posts from new members are moderated - please be patient with your 
first post.
To unsubscribe from this group, send email to
clojure+unsubscr...@googlegroups.com
For more options, visit this group at
http://groups.google.com/group/clojure?hl=en
--- 
You received this message because you are subscribed to the Google Groups 
"Clojure" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to clojure+unsubscr...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Reply via email to