Hi Aviem,

A great question!

The two backlog methods (getSplitBacklogBytes
<https://github.com/apache/incubator-beam/blob/3a8b9b5212972f0128099251884473d06758e2aa/sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedSource.java#L247>()
and getTotalBacklogBytes
<https://github.com/apache/incubator-beam/blob/3a8b9b5212972f0128099251884473d06758e2aa/sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedSource.java#L258>())
are intended to be complementary -- aka, you need only implement one or the
other. ("One of this or {@link #getSplitBacklogBytes} should be overridden
in order to allow the runner to scale the amount of resources allocated to
the pipeline.")

We've found these two to be expressive enough to express most unbounded
sources. For many, such as KafkaIO, each reader corresponds to some logical
partition of the stream -- e.g., a KafkaIO partition or a CountingSource
split -- so that getSplitBacklogBytes makes sense. As you have identified,
these seem to be the ones we have now.

However, there are other sources where specific readers don't have any
meaning other than as a consumer from a large pool. Messages are assigned
somewhat arbitrarily from the pool to readers based e.g. on when they poll.
In this case, a per-reader (per-split) backlog doesn't make sense so much
as simply "how much data is available in the pool?".

Depending on the source, implementors should pick one or the other. (If
they implement both, it's up to runners how to use that signal. They might
do something smart, or might just pick one or the other based on what they
believe is more reliable ;).

Dan

On Tue, Nov 29, 2016 at 12:44 AM, Aviem Zur <[email protected]> wrote:

> UnboundedReader exposes method getTotalBacklogBytes() which promises:
> * Returns the size of the backlog of unread data in the underlying data
> source represented by split of this source.
>
> But there are no implementations of this method in any source, I'm assuming
> that this is because splits are instance of the same UnboundedSource impl
> class, which can be distributed to different JVMs or machines, and there
> isn't a context within the source class in which all generated splits'
> backlog can be summed.
> If this is the case, what is the use of this method in the API?
>

Reply via email to