Hi,
I think you first have to convert back to a DataStream using .select() or
.flatSelect(). But Till should know more about this, maybe he can help.

Cheers,
Aljoscha

On Thu, 2 Jun 2016 at 19:19 Kanstantsin Kamkou <kkam...@gmail.com> wrote:

> Hi Aljoscha! Is it possible somehow to use the RichXFunction in CEP?
> The task is pretty similar, but I have to ignore once the next
> triggered event for the same key.
>
>
> On Wed, Jun 1, 2016 at 2:54 PM, Aljoscha Krettek <aljos...@apache.org>
> wrote:
> > Hi,
> > yeah, in that case per-key watermarks would be useful for you. I won't be
> > possible to add such a feature, though, due to the (possibly) dynamic
> nature
> > of the key space and how watermark tracking works.
> >
> > You should be able to implement it with relatively low overhead using a
> > RichFlatMapFunction and keyed state. This is the relevant section of the
> > doc:
> >
> https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/state.html#using-the-keyvalue-state-interface
> .
> >
> > We are also in the process of improving our windowing system, especially
> > when it comes to late data, cleanup and trigger semantics. You can have a
> > look here if you're interested:
> >
> https://docs.google.com/document/d/1Xp-YBf87vLTduYSivgqWVEMjYUmkA-hyb4muX3KRl08/edit?usp=sharing
> .
> >
> > Best,
> > Aljoscha
> >
> > On Tue, 31 May 2016 at 14:36 <leon_mcl...@tutanota.com> wrote:
> >>
> >> Hi Aljoscha,
> >>
> >> thanks for the speedy reply.
> >>
> >> I am processing measurements delivered by smart meters. I use windows to
> >> gather measurements and calculate values such as average consumption.
> The
> >> key is simply the meter ID.
> >>
> >> The challenge is that meters may undergo network partitioning, under
> which
> >> they fall back to local buffering. The data is then transmitted once
> >> connectivity has been re-established. I am using event time to obtain
> >> accurate calculations.
> >>
> >> If a specific meter goes offline, and the watermark progresses to the
> next
> >> window for an operator instance, then all late data will be discarded
> once
> >> that meter is online again, until it has caught up to the event time.
> This
> >> is because I am using a custom EventTimeTrigger implementation that
> discards
> >> late elements. The reason for that is because Flink would otherwise
> >> immediately evaluate the window upon receiving a late element, which is
> a
> >> problem since my calculations (e.g. the average consumption) depend on
> >> multiple elements. I cannot calculate averages with that single late
> >> element.
> >>
> >> Each individual meter guarantees in-order transmission of measurements.
> If
> >> watermarks progressed per key, then i would never have late elements
> because
> >> of that guarantee. I would be able to accurately calculate averages,
> with
> >> the trade-off that my results would arrive sporadically from the same
> >> operator instance.
> >>
> >> I suppose I could bypass the use of windows by implementing a stateful
> map
> >> function that mimics windows to a certain degree. I implemented
> something
> >> similar in Storm, but the amount of application logic required is
> >> substantial.
> >>
> >> I completely understand why Flink evaluates a window on a late element,
> >> since there is no other way to know when to evaluate the window as event
> >> time has already progressed.
> >>
> >> Perhaps there is a way to gather/redirect late elements?
> >>
> >> Regards
> >> Leon
> >>
> >> 31. May 2016 13:37 by aljos...@apache.org:
> >>
> >>
> >> Hi,
> >> I'm afraid this is impossible with the current design of Flink. Might I
> >> ask what you want to achieve with this? Maybe we can come up with a
> >> solution.
> >>
> >> -Aljoscha
> >>
> >> On Tue, 31 May 2016 at 13:24 <leon_mcl...@tutanota.com> wrote:
> >>>
> >>> My use case primarily concerns applying transformations per key, with
> the
> >>> keys remaining fixed throughout the topology. I am using event time
> for my
> >>> windows.
> >>>
> >>> The problem i am currently facing is that watermarks in windows
> propagate
> >>> per operator instance, meaning the operator event time increases for
> all
> >>> keys that the operator is in charge of. I wish for watermarks to
> progress
> >>> per key, not per operator instance.
> >>>
> >>> Is this easily possible? I was unable to find an appropriate solution
> >>> based on existing code recipes.
> >>>
> >>> Greetings
> >>> Leon
>

Reply via email to