Hi Jan,
I have been working on a timeseries extension which makes use of
many of these techniques for joining two temporal streams, it's
almost ready for the PR, will ping it here when it is as it might
be useful for you. In general, I borrowed a lot of techniques
from CoGroupBy code.
/1) need to figure out how to get Coder of input PCollection
of stateful ParDo inside StatefulDoFnRunner/
My join takes in a <K, V1, V2> , in the outer transform I use
things like leftCollection.getCoder()).getValueCoder(); Then when
creating the Join transform I can defer the StateSpec object
creation until the constructor is called.
/2) there are performance considerations, that can be solved
probably only by Sorted Map State [2]/
Sorted Map is going to be awesome, until then the only option is
to create a Cache in the DoFn to make it more efficient. For the
cache to work you need to key on Window + key and do things like
clear the cache @Startbundle. Better to wait for Sorted Map if
this is not time critical.
/3) additional work is needed for allowedLateness to work
correctly (and there are at least two ways how to solve this),
see the design doc [3]/
Yup, in my case I can support this by not GC the right side of
the join for now, but that is a compromise.
/4) more tests (for batch and validatesRunner) are needed/
I just posted a question on the best way to make use of
the @ValidateRunner annotation on this list, sounds like it might
be useful to you as well :-)
On Thu, 6 Jun 2019 at 23:03, Jan Lukavský <[email protected]
<mailto:[email protected]>> wrote:
Hi,
I have written a PoC implementation of this in [1] and I'd
like to
discuss some implementation details. First of all, I'd
appreciate any
feedback about this. There are some known issues:
1) need to figure out how to get Coder of input PCollection of
stateful ParDo inside StatefulDoFnRunner
2) there are performance considerations, that can be solved
probably
only by Sorted Map State [2]
3) additional work is needed for allowedLateness to work
correctly
(and there are at least two ways how to solve this), see the
design doc [3]
4) more tests (for batch and validatesRunner) are needed
I have come across a few bugs in DirectRunner, which I tried
to solve:
a) timers seem to be broken in stateful pardo with side inputs
b) timers need to be sorted by timestamp, otherwise state
might be
cleared before it gets chance to be flushed
Thanks for feedback,
Jan
[1] https://github.com/apache/beam/pull/8774
[2]
http://mail-archives.apache.org/mod_mbox/beam-dev/201905.mbox/%3ccalstk6+ldemtjmnuysn3vcufywjkhmgv1isfbdmxthoqh91...@mail.gmail.com%3e
[3]
https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/
On 5/23/19 4:40 PM, Robert Bradshaw wrote:
> Thanks for writing this up.
>
> I think the justification for adding this to the model
needs to be
> that it is useful (you have this covered, though some
examples would
> be nice) and that it's something that can't easily be done
by users
> themselves (specifically, though it can be (relatively)
cheaply done
> in streaming and batch, it's done in very different ways,
and also
> that it's hard to do via composition).
>
> On Thu, May 23, 2019 at 4:10 PM Jan Lukavský
<[email protected] <mailto:[email protected]>> wrote:
>> Hi,
>>
>> I have written a very brief draft of how it might be
possible to
>> implement @RequireTimeSortedInput discussed in [1]. I see
the document
>> [2] a starting point for a discussion. There are several
open questions,
>> which I believe can be resolved by this great community. :-)
>>
>> Jan
>>
>> [1]
http://mail-archives.apache.org/mod_mbox/beam-dev/201905.mbox/browser
>>
>> [2]
>>
https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/
>>
--
This email may be confidential and privileged. If you received
this communication by mistake, please don't forward it to anyone
else, please erase all copies and attachments, and please let me
know that it has gone to the wrong person.
The above terms reflect a potential business arrangement, are
provided solely as a basis for further discussion, and are not
intended to be and do not constitute a legally binding
obligation. No legally binding obligations will be created,
implied, or inferred until an agreement in final form is executed
in writing by all parties involved.