Hi,

If I remember correctly, I don't think Spark provides watermark value
itself for the current batch to the public API. That said, if you're
dealing with "event time" (and I guess you belong to this case as you worry
about late events), unless you employ a new logical/physical plan to expose
watermarks to the user level function, it's not possible to do what you
plan to do.

I've tried similar thing to count the number of late events via making
changes on Spark codebase (see https://github.com/apache/spark/pull/24936)
- my initial goal was providing side-output on late events to let end users
being able to deal with these events outside of the query, but soon
realized it's non-trivial, and just took the simplest approach at that time.
(There're still possible ideas to do, e.g. sending them to the driver via
RPC, assuming these events are "minority", but nothing comes into
conclusion it worths to put efforts. If your business logic requires it,
you could be a hacker and try to deal with this, and share if you succeed
to make it.)

I'd skip answering questions as I explained you'd be stuck even before
raising these questions.

Hope this helps.

Thanks,
Jungtaek Lim (HeartSaVioR)


On Tue, Mar 9, 2021 at 6:49 AM Sergey Oboguev <obog...@gmail.com> wrote:

> I have a Spark structured streaming based application that performs
> window(...) construct followed by aggregation.
>
> This construct discards latecomer events that arrive past the watermark. I
> need to be able to detect these late events to handle them out-of-band.
> The application maintains a raw store of all received events and can
> re-aggregate a particular time interval for a particular device in a
> secondary batch mode, as long as it knows that this interval has to be
> re-aggregated, i.e. contains latecomer data that was discarded by
> structured streaming due to watermark.
>
> I am trying to come with a way to perform such a detection.
>
> One approach would perhaps be to insert an additional stage before
> window(...) -- a stage that would monitor all events received by the
> stream, look at their timestamps, and predict which events will be
> discarded by window(...) due to watermark. Such events can then be handled
> outside of Spark structured streaming. The stage can be based on
> Dataset.foreach, Dataset.filter or Dataset.map that will pass all events
> through, but also monitor the events and if a latecomer condition is
> detected, then issue a side channel notification that will cause data for
> the specified device and interval be re-aggregated later from raw event
> storage, out of stream.
>
> I have a couple of questions related to the feasibility of such a
> construct.
>
> Q1:
>
> Can data behind the window(...) be shared by multiple executors or nodes,
> or is it owned by one executor at any given time? If it is shared, it would
> appear that local monitoring of passing timestamps would be insufficient,
> since it lacks global context.
>
> Q2:
>
> To monitor the stream, the stage needs to maintain a context. The context
> can be checkpointed periodically in an external store, but I do not want to
> persist/readback the context for every microbatch (or, in the foreach case,
> for every individual event). I want to checkpoint the context infrequently,
> and maintain it across microbatches just in memory.
>
> Which brings a question... The handler function inside the stage (called
> by foreach, map, or filter) needs to refer to the context object, yet it is
> unclear how to make such a reference.
>
> I could attach a context to the stream via some global map object
> (translating stream->context), but handler functions for Dataset.foreach,
> Dataset.map, or Dataset.filter do not receive a stream handle, and thus
> have no key to use for translation back to context object.
>
> The association cannot be done via a TLS (per-thread) variable too, since
> Spark can internally create threads for stream processing and they won't
> inherit the parent TLS (and also may not even have the thread that started
> the stream as their parent thread).
>
> This appears to leave Java static variable as the only option for the
> context pointer, limiting the model to one active stream per executor. But
> is it guaranteed by Spark specification that different executors will run
> in different JVM instances?
>
> Thanks for advice.
>

Reply via email to