Hello!
I have a datastream like this:
env.readTextFile("events.log")
.map(event => StopFactory(event)) // I have defined a Stop class and this
creates an instance from the file line
.assignTimestampsAndWatermarks(stopEventTimeExtractor) // extract the timestamp
from a field from each instance
.keyBy("mediaResource.contentId")
.window(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.DAYS)))
.trigger(ContinuousEventTimeTrigger.of(Time.of(5, TimeUnit.MINUTES)))
.reduce( (eventA: Stop, _: Stop) => {
eventA.addVisualization()
println("********** REDUCING **********")
eventA
}),
(key: Tuple,
window: TimeWindow,
input: Iterable[Stop],
out: Collector[Stop] ) =>
{
val stop = input.iterator.next()
println("------------- PROCESSING ----------------")
if (stop.changed) {
stop.changed = false
out.collect(stop)
}
.print()
})
If I execute this, I see that first of all are executed every reduce and at the
end starts to execute the processingFunction part
This is a problem for me because I want to add a sinkfunction for outputting
results each 5 minutes from the whole daily aggregation and the goal is having
either kafka source (I didn't have any problem with kafka) or a bunch of large
files.
If I change the countinousTimeEventTrigger with a CountTrigger, I see that it
does the sink and process each "x" events but at the end of the file and after
every reduce is executed.
How could I manage this daily aggregation with 5 minutes trigger of the partial
aggregation having a file as source?
Thank you!!
Marta
Antes de imprimir este mensaje, por favor, compruebe que es necesario. PROTEGER
EL MEDIO AMBIENTE EST? TAMBI?N EN SU MANO.
**** DISCLAIMER****
Este e-mail contiene informaci?n confidencial, el contenido de la misma se
encuentra protegido por Ley. Cualquier persona distinta a su destinataria tiene
prohibida su reproducci?n, uso, divulgaci?n o impresi?n total o parcial. Si ha
recibido este mensaje por error, notif?quelo de inmediato al remitente borrando
el mensaje original juntamente con sus ficheros anexos. Gracias
This e-mail contains confidencial information protected by Law. It is
prohibited to reproduce, use, disclose and totally or partially print the
content of this e-mail to any person other than intended recipient. If you have
received this message by mistake, please notify immediately the sender and
delete the original message jointly with all attached files. Thank you.