Adding the code for CustomWatermarkGenerator
.....
@Override
public void onEvent(Customer customer, long l, WatermarkOutput
watermarkOutput) {
currentMaxTimestamp = Math.max(currentMaxTimestamp,
customer.getEventTime() );
}
@Override
public void onPeriodicEmit(WatermarkOutput watermarkOutput) {
watermarkOutput.emitWatermark(new Watermark(currentMaxTimestamp));
}
.....
On Thu, May 6, 2021 at 1:33 AM Swagat Mishra <[email protected]> wrote:
> Hi,
>
> Bit of background, I have a stream of customers who have purchased some
> product, reading these transactions on a KAFKA topic. I want to aggregate
> the number of products the customer has purchased in a particular duration
> ( say 10 seconds ) and write to a sink.
>
> I am using session windows to achieve the above.
>
> For test purposes, i have mocked up a customer stream and executed
> session windows like below.
>
> StreamExecutionEnvironment environment =
> StreamExecutionEnvironment.getExecutionEnvironment();
> DataStream<Customer> customerStream = environment.addSource( new
> CustomerGenerator() );
>
> customerStream
>
> //.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(7)))
> .assignTimestampsAndWatermarks(WatermarkStrategy.forGenerator(new
> WaterMarkAssigner()))
> .keyBy(Customer::getIdentifier)
> .window(EventTimeSessionWindows.withGap(Time.seconds(5)))
> .trigger(EventTimeTrigger.create())
> .evictor(new CustomerEvictor())
> .process(new CustomAggregateFunction())
> .print();
>
> My watermark assigner looks like:
>
> public class WaterMarkAssigner implements WatermarkStrategy<Customer> {
> static final Logger logger =
> LoggerFactory.getLogger(WaterMarkAssigner.class);
>
> @Override
> public WatermarkGenerator<Customer>
> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
> return new CustomWatermarkGenerator();
> }
> }
>
> I notice that the evictor, and aggregation functions are getting called only
> once for the first customer in the stream.
>
> The data stream is generating customers at 1 seconds interval and there are 5
> customer keys for which it's generating transactions.
>
> Am I doing something wrong with the above?
>
> I want to be able to capture the event on each transaction getting added and
> removed from the window so that I can perform the aggregation.
>
> please advise.
>
>
>
>
>