Hi,

I am new to Flink. I have a question on this "rolling" fold function.

If its parallelism is large than one, does the "rolling" order remains the 
same? i.e. it is always keep the "1-2-3-4-5" on an increasing sequence.

Regards,

Min

------------------------------------------------------------------- 
FoldFunction 
---------------------------------------------------------------------------------------------------------------
A "rolling" fold on a keyed data stream with an initial value. Combines the 
current element with the last folded value and emits the new value.
A fold function that, when applied on the sequence (1,2,3,4,5), emits the 
sequence "start-1", "start-1-2", "start-1-2-3", ...
DataStream<String> result =
  keyedStream.fold("start", new FoldFunction<Integer, String>() {
    @Override
    public String fold(String current, Integer value) {
        return current + "-" + value;
    }
  });
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

From: jincheng sun [mailto:sunjincheng...@gmail.com]
Sent: Freitag, 7. Dezember 2018 02:24
To: rakkukumar2...@gmail.com
Cc: user@flink.apache.org; d...@flink.apache.org
Subject: [External] Re: delay one of the datastream when performing join 
operation on event-time and watermark

Hi Pakesh Kuma,
I think you can using the interval-join, e.g.:

orderStream

    .keyBy(<KeySelector>)

    .intervalJoin(invoiceStream.keyBy(<KeySelector>))

    .between(Time.minutes(-5), Time.minutes(5))
The semantics of interval-join and detailed usage description can refer to 
https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/operators/joining.html#interval-join

Hope to help you, and any feedback is welcome!

Bests,
Jincheng


Rakesh Kumar <rakkukumar2...@gmail.com<mailto:rakkukumar2...@gmail.com>> 
于2018年12月6日周四 下午7:10写道:
Hi,
I have two data sources one is  for order data and another one is for invoice 
data, these two data i am pushing into kafka topic in json form. I wanted to 
delay order data for 5 mins because invoice data comes only after order data is 
generated. So, for that i have written a flink program which will take these 
two data from kafka and apply watermarks and delay order data for 5 mins. After 
applying watermarks on these data, i wanted to join these data based on 
order_id which is present in both order and invoice data. After Joining i 
wanted to push it to kafka in different topic.

But, i am not able to join these data streams with 5 min delay and i am not 
able to figure it out.

I am attaching my flink program below and it's dependency.
Check out our new brand campaign: www.ubs.com/together
E-mails can involve SUBSTANTIAL RISKS, e.g. lack of confidentiality, potential 
manipulation of contents and/or sender's address, incorrect recipient 
(misdirection), viruses etc. Based on previous e-mail correspondence with you 
and/or an agreement reached with you, UBS considers itself authorized to 
contact you via e-mail. UBS assumes no responsibility for any loss or damage 
resulting from the use of e-mails. 
The recipient is aware of and accepts the inherent risks of using e-mails, in 
particular the risk that the banking relationship and confidential information 
relating thereto are disclosed to third parties.
UBS reserves the right to retain and monitor all messages. Messages are 
protected and accessed only in legally justified cases.
For information on how UBS uses and discloses personal data, how long we retain 
it, how we keep it secure and your data protection rights, please see our 
Privacy Notice http://www.ubs.com/global/en/legalinfo2/privacy.html

Reply via email to