Hello! I have a datastream like this:
env.readTextFile("events.log") .map(event => StopFactory(event)) // I have defined a Stop class and this creates an instance from the file line .assignTimestampsAndWatermarks(stopEventTimeExtractor) // extract the timestamp from a field from each instance .keyBy("mediaResource.contentId") .window(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.DAYS))) .trigger(ContinuousEventTimeTrigger.of(Time.of(5, TimeUnit.MINUTES))) .reduce( (eventA: Stop, _: Stop) => { eventA.addVisualization() println("********** REDUCING **********") eventA }), (key: Tuple, window: TimeWindow, input: Iterable[Stop], out: Collector[Stop] ) => { val stop = input.iterator.next() println("------------- PROCESSING ----------------") if (stop.changed) { stop.changed = false out.collect(stop) } .print() }) If I execute this, I see that first of all are executed every reduce and at the end starts to execute the processingFunction part This is a problem for me because I want to add a sinkfunction for outputting results each 5 minutes from the whole daily aggregation and the goal is having either kafka source (I didn't have any problem with kafka) or a bunch of large files. If I change the countinousTimeEventTrigger with a CountTrigger, I see that it does the sink and process each "x" events but at the end of the file and after every reduce is executed. How could I manage this daily aggregation with 5 minutes trigger of the partial aggregation having a file as source? Thank you!! Marta Antes de imprimir este mensaje, por favor, compruebe que es necesario. PROTEGER EL MEDIO AMBIENTE EST? TAMBI?N EN SU MANO. **** DISCLAIMER**** Este e-mail contiene informaci?n confidencial, el contenido de la misma se encuentra protegido por Ley. Cualquier persona distinta a su destinataria tiene prohibida su reproducci?n, uso, divulgaci?n o impresi?n total o parcial. Si ha recibido este mensaje por error, notif?quelo de inmediato al remitente borrando el mensaje original juntamente con sus ficheros anexos. Gracias This e-mail contains confidencial information protected by Law. It is prohibited to reproduce, use, disclose and totally or partially print the content of this e-mail to any person other than intended recipient. If you have received this message by mistake, please notify immediately the sender and delete the original message jointly with all attached files. Thank you.