Hi Alain,
windowing would be your goto method. AFAIK there is not way to extend a
window, if you expect more messages to arrive. Thus you would need to
live with multiple batches, in case of a prolonged burst. back pressure
however is possible, even if you use a buffer and/or windowing. The
solution would look like this:
psp.createPushStreamBuilder()
.withPushbackPolicy( q -> {
return Math.max(0, q.size() - 650);
})
.withQueuePolicy(QueuePolicyOption.BLOCK)
.withBuffer(new ArrayBlockingQueue<PushEvent<?
extends EObject>>(1200))
.build();
This PuschbackPolicy looks at the queue size and gradually increases the
pushback starting with one on the 651st element.
The grouping is another topic. The split method can do your grouping, if
you know what groups to expect. It essentially returns an array of
pushstreams correlating to each predicate you give it. For everything
else, you would need to do the grouping for every batch you get with the
usual stream methods.
Regards,
Jürgen.
Am 05/01/2019 um 19:47 schrieb Alain Picard via osgi-dev:
For now I went with my simple solution of using a window with just
duration, and that is working fine, even if it might not be the most
optimal or streamlined approach.
Alain
On Sat, Jan 5, 2019 at 5:27 AM Alain Picard <pic...@castortech.com
<mailto:pic...@castortech.com>> wrote:
Hi,
We are using push streams to process post-commit events. Those
events originate from different data sources. At the moment we are
processing those individually, but the overhead of having a
transaction for each is too much. Quite often those events come in
bursts following an upstream transaction/ change set.
The goal is to group events by data source and batch them, i.e.
wait a bit when an event arrives to see if others are also coming.
If they keep coming, keep collecting a bit longer, o/w move on.
I see that the PushStream has methods coalesce and window. Window
seems a bit more appropriate here, as it offers both duration and
maxEvents. But it seems to operate all the time, and not start a
batch upon receiving an event, which doesn't sound optimal in this
case. More concerning to me is the comment regarding
back-pressure. We can't use back pressure (no control on producer
which is implemented via whiteboard. So here the maxEvents is more
a way to limit the batch and not to indicate need for back pressure.
Still, that doesn't address grouping. See that there is a fork,
but that is made to deal with a fixed number of child streams.
Would I just be best to use a window with just duration, collect a
number of events, then move on and use a regular stream to group
them and if necessary batch them in smaller groups?
Cheers,
Alain
_______________________________________________
OSGi Developer Mail List
osgi-dev@mail.osgi.org
https://mail.osgi.org/mailman/listinfo/osgi-dev
--
Jürgen Albert
Geschäftsführer
Data In Motion Consulting GmbH
Kahlaische Str. 4
07745 Jena
Mobil: 0157-72521634
E-Mail: j.alb...@datainmotion.de
Web: www.datainmotion.de
XING: https://www.xing.com/profile/Juergen_Albert5
Rechtliches
Jena HBR 513025
_______________________________________________
OSGi Developer Mail List
osgi-dev@mail.osgi.org
https://mail.osgi.org/mailman/listinfo/osgi-dev