Hi everyone.

I've been debugging a streaming job (on dataflow) for a little while now,
and seem to have tracked it down to the Wait.on transform that I'm using.

Some background: our pipeline takes in ~250,000 messages/sec from pubsub,
aggregates them in an hourly window, and then emits the results.  The final
output from the combiner is ~20,000,000 keys (emitted at the end of the
hour).

These results are tee-d, they get written somewhere, and they keys are also
sent into a Wait.on transform, where they wait for a signal, before being
written to pubsub.

If I let this pipeline run for a couple of hours, the input processing rate
eventually drops down below the rate of messages going into the queue, and
I'll get a bunch of deadline_exceeded errors from windmill for one specific
worker.  At this point the pipeline is basically unrecoverable and needs to
be restarted.

If I remove the wait transform, everything works great.

My back-of-the-envelope calculations are that the elements going into the
wait transform are ~1 GB total, so its not a huge input either.  My guess
is there's some kind of O(n^2) operation happening here, because this same
pipeline does work fairly reliably with a lower key space (~100,000-1
million).

The other interesting thing I've noticed is that the stage is constantly
processing operations even with no messages coming into it. (eg, in my
lower scale case, in the windmill status page, the stage has ~100,000
active operations / second "succeeding", but no messages are going into the
stage (since its not the end of the hour)).  It's also written 10 GB of
data, although the dataflow UI says that only 500 MB of data has gone into
the wait transform.

It seems like there might just be a bug here, but in the interim, is there
any way to construct something similar to the Wait transform but without
using side-inputs?  My first guess was possibly a CoGroupByKey?

Thanks!

Reply via email to