Hi Marcus,

from a first glance your pipeline looks correct. It should not be executed with a parallelism of one, if not specified explicitly. Which time semantics are you using? If it is event-time, I would check your timestamps and watermarks assignment. Maybe you can also check in the web frontend which operator is executed with which parallelism. Btw. according to the JavaDocs of reduce(): "Sliding time windows will aggregate on the granularity of the slide interval" so it is called multiple times.

Regards,
Timo


Am 9/29/17 um 8:56 PM schrieb Marcus Clendenin:


I have a job that is performing an aggregation over a time window. This windowing is supposed to be happening by key, but the output I am seeing is creating an overall window on everything coming in. Is this happening because I am doing a map of the data before I am running the keyBy command? This is a representation of what I am running

*val *stream = env
  .addSource(kafkaConsumer)

//filter out bad json

*val *jsonDeserializer = *new *JSONDeserializationSchema()
*val *filteredStream = stream.filter(text => {
*try *{
      jsonDeserializer.deserialize(text.getBytes)
*true
*}
*catch *{
*case *e: Exception => *false
*}
  })
  val kafkaStream = filteredStream.map(text => jsonDeserializer.deserialize(text.getBytes))

//method used to filter json not meeting the expected requests
val filteredJsonStream = filterIncorrectJson(kafkaStream)

//method used to map Json to input object

val mappedStream = mapJsonToObject(filteredJsonStream)

// pull key out of object

val keyedStream = mappedStream.keyBy(_.key)

// add window

val windowedStream = keyedStream.timeWindow(windowSize, windowSlide)

// reduce to aggregates

val reducedStream = windowedStream.reduce(aggregateData())

<https://maps.google.com/?q=5411+Page+Rd%0D+Durham,+NC+27709%0D+Office:+%28919&entry=gmail&source=g>

I am pulling in data from Kafka as a String, mapping it to my data model and then pulling out the key, applying the time window with a 30 minute window, 5 minute slide and doing an aggregation. I am expecting<https://maps.google.com/?q=5411+Page+Rd%0D+Durham,+NC+27709%0D+Office:+%28919&entry=gmail&source=g> that the aggregation is happening on a time window that is separate for each iteration of the key but it is happening every 5 minutes for all keys.


Reply via email to