I am reading stats from Kinesis, deserializing them into a stat POJO and then doing something like this using an aggregated window with no defined processWindow function:
timestampedStats .keyBy(v -> v.groupKey()) .window(TumblingEventTimeWindows.of(Time.seconds(appCfg.getWindowSize()))) .aggregate(new ImpactAggregator(appCfg.getSmoothingRange(), appCfg.getThreshold())) .sinkTo(getKinesisProducer(appCfg.getAcuWindowsStreamName(), appCfg.getAwsRegion())) .name("Kinesis Producer"); As part of the aggregation logic, I am looking for certain threshold violations where some field in each metric is tested against some fixed threshold. I then increment a counter in an accumulator for each stat violation for the duration of the window (300 seconds) along with some other metadata associated with that stat that violated the threshold. If there are violations, then I want to process the window by serializing its contents to JSON and publishing to Kinesis. What I want to do is NOT serialize a window that has NO violations in its accumulator. There is no need to send a message when no bad conditions are observed. - Could you please tell me how I can just throw away a window and NOT serialize it when it is ready to be processed? - How do I hook into some event that allows me to do that? - Do I need to implement a ProcessKeyedWindowFunction along with my AggregateFunction and somehow handle this as part of the process window function? I have created a class that implements SerializationSchema to do that serialization but the serialize() function requires a valid JSON returned byte[]. I think the solution is somewhere else where I can elect to NOT process the window at all and thereby serialize() will NOT get called. Thank you