Hi Pakesh Kuma,
I think you can using the interval-join, e.g.:

orderStream
    .keyBy(<KeySelector>)
    .intervalJoin(invoiceStream.keyBy(<KeySelector>))
    .between(Time.minutes(-5), Time.minutes(5))

The semantics of interval-join and detailed usage description can refer to
https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/operators/joining.html#interval-join

Hope to help you, and any feedback is welcome!

Bests,
Jincheng


Rakesh Kumar <rakkukumar2...@gmail.com> 于2018年12月6日周四 下午7:10写道:

> Hi,
> I have two data sources one is  for order data and another one is for
> invoice data, these two data i am pushing into kafka topic in json form. I
> wanted to delay order data for 5 mins because invoice data comes only after
> order data is generated. So, for that i have written a flink program which
> will take these two data from kafka and apply watermarks and delay order
> data for 5 mins. After applying watermarks on these data, i wanted to join
> these data based on order_id which is present in both order and invoice
> data. After Joining i wanted to push it to kafka in different topic.
>
> But, i am not able to join these data streams with 5 min delay and i am
> not able to figure it out.
>
> I am attaching my flink program below and it's dependency.
>

Reply via email to