Hi Marcus,
from a first glance your pipeline looks correct. It should not be
executed with a parallelism of one, if not specified explicitly. Which
time semantics are you using? If it is event-time, I would check your
timestamps and watermarks assignment. Maybe you can also check in the
web frontend which operator is executed with which parallelism. Btw.
according to the JavaDocs of reduce(): "Sliding time windows will
aggregate on the granularity of the slide interval" so it is called
multiple times.
Regards,
Timo
Am 9/29/17 um 8:56 PM schrieb Marcus Clendenin:
I have a job that is performing an aggregation over a time window.
This windowing is supposed to be happening by key, but the output I am
seeing is creating an overall window on everything coming in. Is this
happening because I am doing a map of the data before I am running the
keyBy command? This is a representation of what I am running
*val *stream = env
.addSource(kafkaConsumer)
//filter out bad json
*val *jsonDeserializer = *new *JSONDeserializationSchema()
*val *filteredStream = stream.filter(text => {
*try *{
jsonDeserializer.deserialize(text.getBytes)
*true
*}
*catch *{
*case *e: Exception => *false
*}
})
val kafkaStream = filteredStream.map(text =>
jsonDeserializer.deserialize(text.getBytes))
//method used to filter json not meeting the expected requests
val filteredJsonStream = filterIncorrectJson(kafkaStream)
//method used to map Json to input object
val mappedStream = mapJsonToObject(filteredJsonStream)
// pull key out of object
val keyedStream = mappedStream.keyBy(_.key)
// add window
val windowedStream = keyedStream.timeWindow(windowSize, windowSlide)
// reduce to aggregates
val reducedStream = windowedStream.reduce(aggregateData())
<https://maps.google.com/?q=5411+Page+Rd%0D+Durham,+NC+27709%0D+Office:+%28919&entry=gmail&source=g>
I am pulling in data from Kafka as a String, mapping it to my data
model and then pulling out the key, applying the time window with a 30
minute window, 5 minute slide and doing an aggregation. I am
expecting<https://maps.google.com/?q=5411+Page+Rd%0D+Durham,+NC+27709%0D+Office:+%28919&entry=gmail&source=g>
that the aggregation is happening on a time window that is separate
for each iteration of the key but it is happening every 5 minutes for
all keys.