Hi,
Bit of background, I have a stream of customers who have purchased some
product, reading these transactions on a KAFKA topic. I want to aggregate
the number of products the customer has purchased in a particular duration
( say 10 seconds ) and write to a sink.
I am using session windows to achieve the above.
For test purposes, i have mocked up a customer stream and executed session
windows like below.
StreamExecutionEnvironment environment =
StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Customer> customerStream = environment.addSource( new
CustomerGenerator() );
customerStream
//.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(7)))
.assignTimestampsAndWatermarks(WatermarkStrategy.forGenerator(new
WaterMarkAssigner()))
.keyBy(Customer::getIdentifier)
.window(EventTimeSessionWindows.withGap(Time.seconds(5)))
.trigger(EventTimeTrigger.create())
.evictor(new CustomerEvictor())
.process(new CustomAggregateFunction())
.print();
My watermark assigner looks like:
public class WaterMarkAssigner implements WatermarkStrategy<Customer> {
static final Logger logger =
LoggerFactory.getLogger(WaterMarkAssigner.class);
@Override
public WatermarkGenerator<Customer>
createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
return new CustomWatermarkGenerator();
}
}
I notice that the evictor, and aggregation functions are getting
called only once for the first customer in the stream.
The data stream is generating customers at 1 seconds interval and
there are 5 customer keys for which it's generating transactions.
Am I doing something wrong with the above?
I want to be able to capture the event on each transaction getting
added and removed from the window so that I can perform the
aggregation.
please advise.