Hi Alain,

windowing would be your goto method. AFAIK there is not way to extend a window, if you expect more messages to arrive. Thus you would need to live with multiple batches, in case of a prolonged burst. back pressure however is possible, even if you use a buffer and/or windowing.  The solution would look like this:

psp.createPushStreamBuilder()
                        .withPushbackPolicy( q -> {
                            return Math.max(0, q.size() - 650);
                        })
.withQueuePolicy(QueuePolicyOption.BLOCK)
                        .withBuffer(new ArrayBlockingQueue<PushEvent<? extends EObject>>(1200))
                        .build();

This PuschbackPolicy looks at the queue size and gradually increases the pushback starting with one on the 651st element.

The grouping is another topic. The split method can do your grouping, if you know what groups to expect. It essentially  returns an array of pushstreams correlating to each predicate you give it. For everything else, you would need to do the grouping for every batch you get with the usual stream methods.

Regards,

Jürgen.

Am 05/01/2019 um 19:47 schrieb Alain Picard via osgi-dev:
For now I went with my simple solution of using a window with just duration, and that is working fine, even if it might not be the most optimal or streamlined approach.

Alain


On Sat, Jan 5, 2019 at 5:27 AM Alain Picard <pic...@castortech.com <mailto:pic...@castortech.com>> wrote:

    Hi,

    We are using push streams to process post-commit events. Those
    events originate from different data sources. At the moment we are
    processing those individually, but the overhead of having a
    transaction for each is too much. Quite often those events come in
    bursts following an upstream transaction/ change set.

    The goal is to group events by data source and batch them, i.e.
    wait a bit when an event arrives to see if others are also coming.
    If they keep coming, keep collecting a bit longer, o/w move on.

    I see that the PushStream has methods coalesce and window. Window
    seems a bit more appropriate here, as it offers both duration and
    maxEvents. But it seems to operate all the time, and not start a
    batch upon receiving an event, which doesn't sound optimal in this
    case. More concerning to me is the comment regarding
    back-pressure. We can't use back pressure (no control on producer
    which is implemented via whiteboard. So here the maxEvents is more
    a way to limit the batch and not to indicate need for back pressure.

    Still, that doesn't address grouping. See that there is a fork,
    but that is made to deal with a fixed number of child streams.

    Would I just be best to use a window with just duration, collect a
    number of events, then move on and use a regular stream to group
    them and if necessary batch them in smaller groups?

    Cheers,

    Alain


_______________________________________________
OSGi Developer Mail List
osgi-dev@mail.osgi.org
https://mail.osgi.org/mailman/listinfo/osgi-dev


--
Jürgen Albert
Geschäftsführer

Data In Motion Consulting GmbH

Kahlaische Str. 4
07745 Jena

Mobil:  0157-72521634
E-Mail: j.alb...@datainmotion.de
Web: www.datainmotion.de

XING:   https://www.xing.com/profile/Juergen_Albert5

Rechtliches

Jena HBR 513025

_______________________________________________
OSGi Developer Mail List
osgi-dev@mail.osgi.org
https://mail.osgi.org/mailman/listinfo/osgi-dev

Reply via email to