Hello!

I have a datastream like this:

env.readTextFile("events.log")
.map(event => StopFactory(event)) // I have defined a Stop class and this 
creates an instance from the file line
.assignTimestampsAndWatermarks(stopEventTimeExtractor) // extract the timestamp 
from a field from each instance
.keyBy("mediaResource.contentId")
.window(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.DAYS)))
.trigger(ContinuousEventTimeTrigger.of(Time.of(5, TimeUnit.MINUTES)))
.reduce( (eventA: Stop, _: Stop) => {
      eventA.addVisualization()
      println("********** REDUCING **********")
      eventA
    }),
         (key: Tuple,
         window: TimeWindow,
         input: Iterable[Stop],
         out: Collector[Stop] ) =>
        {
          val stop = input.iterator.next()
          println("------------- PROCESSING ----------------")
          if (stop.changed) {
            stop.changed = false
            out.collect(stop)
          }
          .print()
        })

If I execute this, I see that first of all are executed every reduce and at the 
end starts to execute the processingFunction part

This is a problem for me because I want to add a sinkfunction for outputting 
results each 5 minutes from the whole daily aggregation and the goal is having 
either kafka source (I didn't have any problem with kafka) or a bunch of large 
files.

If I change the countinousTimeEventTrigger with a CountTrigger, I see that it 
does the sink and process each "x" events but at the end of the file and after 
every reduce is executed.

How could I manage this daily aggregation with 5 minutes trigger of the partial 
aggregation having a file as source?

Thank you!!

Marta
Antes de imprimir este mensaje, por favor, compruebe que es necesario. PROTEGER 
EL MEDIO AMBIENTE EST? TAMBI?N EN SU MANO.

**** DISCLAIMER****

Este e-mail contiene informaci?n confidencial, el contenido de la misma se 
encuentra protegido por Ley. Cualquier persona distinta a su destinataria tiene 
prohibida su reproducci?n, uso, divulgaci?n o impresi?n total o parcial. Si ha 
recibido este mensaje por error, notif?quelo de inmediato al remitente borrando 
el mensaje original juntamente con sus ficheros anexos. Gracias

This e-mail contains confidencial information protected by Law. It is 
prohibited to reproduce, use, disclose and totally or partially print the 
content of this e-mail to any person other than intended recipient. If you have 
received this message by mistake, please notify immediately the sender and 
delete the original message jointly with all attached files. Thank you.

Reply via email to