Hi Marta,

Do you mean you want to emit results every 5 minutes based on the wall
time (processing time)? If so you should use the
ContinuousProcessingTimeTrigger instead of ContinuousEventTimeTrigger
which will emit results based on the event time.

Does that solve your problem?

Best,

Dawid

On 02/12/2020 20:54, ANON Marta wrote:
>
> Hello!
>
>  
>
> I have a datastream like this:
>
> env.readTextFile(“events.log”)
>
> .map(event => StopFactory(event)) // I have defined a Stop class and
> this creates an instance from the file line
>
> .assignTimestampsAndWatermarks(stopEventTimeExtractor) // extract the
> timestamp from a field from each instance
>
> .keyBy("mediaResource.contentId")
>
> .window(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.DAYS)))
>
> .trigger(ContinuousEventTimeTrigger.of(Time.of(5, TimeUnit.MINUTES)))
>
> .reduce( (eventA: Stop, _: Stop) => {
>
>       eventA.addVisualization()
>
>       println("********** REDUCING **********")
>
>       eventA
>
>     }),
>
>          (key: Tuple,
>
>          window: TimeWindow,
>
>          input: Iterable[Stop],
>
>          out: Collector[Stop] ) =>
>
>         {
>
>           val stop = input.iterator.next()
>
>           println("------------- PROCESSING ----------------")
>
>           if (stop.changed) {
>
>             stop.changed = false
>
>             out.collect(stop)
>
>           }
>
>           .print()
>
>         })
>
>  
>
> If I execute this, I see that first of all are executed every reduce
> and at the end starts to execute the processingFunction part
>
>  
>
> This is a problem for me because I want to add a sinkfunction for
> outputting results each 5 minutes from the whole daily aggregation and
> the goal is having either kafka source (I didn’t have any problem with
> kafka) or a bunch of large files.
>
>  
>
> If I change the countinousTimeEventTrigger with a CountTrigger, I see
> that it does the sink and process each “x” events but at the end of
> the file and after every reduce is executed.
>
>  
>
> How could I manage this daily aggregation with 5 minutes trigger of
> the partial aggregation having a file as source?
>
>  
>
> Thank you!!
>
>  
>
> Marta
>
> Antes de imprimir este mensaje, por favor, compruebe que es necesario.
> PROTEGER EL MEDIO AMBIENTE ESTÁ TAMBIÉN EN SU MANO.
>
> **** DISCLAIMER****
>
> Este e-mail contiene información confidencial, el contenido de la
> misma se encuentra protegido por Ley. Cualquier persona distinta a su
> destinataria tiene prohibida su reproducción, uso, divulgación o
> impresión total o parcial. Si ha recibido este mensaje por error,
> notifíquelo de inmediato al remitente borrando el mensaje original
> juntamente con sus ficheros anexos. Gracias
>
> This e-mail contains confidencial information protected by Law. It is
> prohibited to reproduce, use, disclose and totally or partially print
> the content of this e-mail to any person other than intended
> recipient. If you have received this message by mistake, please notify
> immediately the sender and delete the original message jointly with
> all attached files. Thank you.

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to