Hi, I think you first have to convert back to a DataStream using .select() or .flatSelect(). But Till should know more about this, maybe he can help.
Cheers, Aljoscha On Thu, 2 Jun 2016 at 19:19 Kanstantsin Kamkou <kkam...@gmail.com> wrote: > Hi Aljoscha! Is it possible somehow to use the RichXFunction in CEP? > The task is pretty similar, but I have to ignore once the next > triggered event for the same key. > > > On Wed, Jun 1, 2016 at 2:54 PM, Aljoscha Krettek <aljos...@apache.org> > wrote: > > Hi, > > yeah, in that case per-key watermarks would be useful for you. I won't be > > possible to add such a feature, though, due to the (possibly) dynamic > nature > > of the key space and how watermark tracking works. > > > > You should be able to implement it with relatively low overhead using a > > RichFlatMapFunction and keyed state. This is the relevant section of the > > doc: > > > https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/state.html#using-the-keyvalue-state-interface > . > > > > We are also in the process of improving our windowing system, especially > > when it comes to late data, cleanup and trigger semantics. You can have a > > look here if you're interested: > > > https://docs.google.com/document/d/1Xp-YBf87vLTduYSivgqWVEMjYUmkA-hyb4muX3KRl08/edit?usp=sharing > . > > > > Best, > > Aljoscha > > > > On Tue, 31 May 2016 at 14:36 <leon_mcl...@tutanota.com> wrote: > >> > >> Hi Aljoscha, > >> > >> thanks for the speedy reply. > >> > >> I am processing measurements delivered by smart meters. I use windows to > >> gather measurements and calculate values such as average consumption. > The > >> key is simply the meter ID. > >> > >> The challenge is that meters may undergo network partitioning, under > which > >> they fall back to local buffering. The data is then transmitted once > >> connectivity has been re-established. I am using event time to obtain > >> accurate calculations. > >> > >> If a specific meter goes offline, and the watermark progresses to the > next > >> window for an operator instance, then all late data will be discarded > once > >> that meter is online again, until it has caught up to the event time. > This > >> is because I am using a custom EventTimeTrigger implementation that > discards > >> late elements. The reason for that is because Flink would otherwise > >> immediately evaluate the window upon receiving a late element, which is > a > >> problem since my calculations (e.g. the average consumption) depend on > >> multiple elements. I cannot calculate averages with that single late > >> element. > >> > >> Each individual meter guarantees in-order transmission of measurements. > If > >> watermarks progressed per key, then i would never have late elements > because > >> of that guarantee. I would be able to accurately calculate averages, > with > >> the trade-off that my results would arrive sporadically from the same > >> operator instance. > >> > >> I suppose I could bypass the use of windows by implementing a stateful > map > >> function that mimics windows to a certain degree. I implemented > something > >> similar in Storm, but the amount of application logic required is > >> substantial. > >> > >> I completely understand why Flink evaluates a window on a late element, > >> since there is no other way to know when to evaluate the window as event > >> time has already progressed. > >> > >> Perhaps there is a way to gather/redirect late elements? > >> > >> Regards > >> Leon > >> > >> 31. May 2016 13:37 by aljos...@apache.org: > >> > >> > >> Hi, > >> I'm afraid this is impossible with the current design of Flink. Might I > >> ask what you want to achieve with this? Maybe we can come up with a > >> solution. > >> > >> -Aljoscha > >> > >> On Tue, 31 May 2016 at 13:24 <leon_mcl...@tutanota.com> wrote: > >>> > >>> My use case primarily concerns applying transformations per key, with > the > >>> keys remaining fixed throughout the topology. I am using event time > for my > >>> windows. > >>> > >>> The problem i am currently facing is that watermarks in windows > propagate > >>> per operator instance, meaning the operator event time increases for > all > >>> keys that the operator is in charge of. I wish for watermarks to > progress > >>> per key, not per operator instance. > >>> > >>> Is this easily possible? I was unable to find an appropriate solution > >>> based on existing code recipes. > >>> > >>> Greetings > >>> Leon >