Hi Amol,

I think you could try (based on your stack overflow code)
org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
like this:

DataStream<Document> streamSource = env
   .addSource(kafkaConsumer)
   .setParallelism(4)
   .assignTimestampsAndWatermarks(
       new 
BoundedOutOfOrdernessTimestampExtractor<Document>(Time.milliseconds(3500)) {
           @Override
           public long extractTimestamp(Event element) {
               Map timeStamp = (Map) event.get("ts”);
               return (long) timeStamp.get("value");
           }
       });

In general, if records are sorted by anything in a Kafka partition, parallel 
subtask of Flink Kafka source will consume these records and push to user 
operators in the same order. There is maximum one consuming subtask per Kafka 
partition but several partitions might be served by one subtask. It means that 
there are the same guarantees as in Kafka: ordering per partition but not 
across them, including no global ordering. 

The case of global and per window ordering is already described by Sihua. The 
global ordering might be impractical in case of distributed system.

If a subtask of your Flink operator consumes from several partitions or there 
is no ordering at all, you can try the above approach with 
BoundedOutOfOrdernessTimestampExtractor to get approximate ordering across 
these partitions per key or all records. It is similar to ordering within a 
window. It means there could still be late records coming after out of 
orderness period of time which can break the ordering. This operator buffers 
records in state to maintain the order but only for out of orderness period of 
time which also increases latency.

Cheers,
Andrey

> On 19 Jun 2018, at 14:12, sihua zhou <summerle...@163.com> wrote:
> 
> 
> 
> Hi Amol,
> 
> 
> I'm not sure whether this is impossible, especially when you need to operate 
> the record in multi parallelism. 
> 
> 
> IMO, in theroy, we can only get a ordered stream when there is a single 
> partition of kafka and operate it with a single parallelism in flink. Even in 
> this case, if you only want to order the records in a window, than you need 
> to store the records in the state, and order them when the window is 
> triggered. But if you want to order the records with a single 
> `keyBy()`(non-window), I think that's maybe impossible in practice, because 
> you need to store the all the incoming records and order the all data for 
> every incoming records, also you need to send retracted message for the 
> previous result(because every incoming record might change the global order 
> of the records).
> 
> 
> Best, Sihua
> On 06/19/2018 19:19,Amol S - iProgrammer<am...@iprogrammer.com> wrote:
> Hi,
> 
> I have used flink streaming API in my application where the source of
> streaming is kafka. My kafka producer will publish data in ascending order
> of time in different partitions of kafka and consumer will read data from
> these partitions. However some kafka partitions may be slow due to some
> operation and produce late results. Is there any way to maintain order in
> this stream though the data arrive out of order. I have tried
> BoundedOutOfOrdernessTimestampExtractor but it didn't served the purpose.
> While digging this problem I came across your documentation (URL:
> https://cwiki.apache.org/confluence/display/FLINK/Time+and+Order+in+Streams)
> and tried to implement this but it didnt worked. I also tried with Table
> API order by but it seems you not support orderBy in flink 1.5 version.
> Please suggest me any workaround for this.
> 
> I have raised same concern on stack overflow
> 
> https://stackoverflow.com/questions/50904615/ordering-of-streams-while-reading-data-from-multiple-kafka-partitions
> 
> Thanks,
> 
> -----------------------------------------------
> *Amol Suryawanshi*
> Java Developer
> am...@iprogrammer.com
> 
> 
> *iProgrammer Solutions Pvt. Ltd.*
> 
> 
> 
> *Office 103, 104, 1st Floor Pride Portal,Shivaji Housing Society,
> Bahiratwadi,Near Hotel JW Marriott, Off Senapati Bapat Road, Pune - 411016,
> MH, INDIA.**Phone: +91 9689077510 | Skype: amols_iprogrammer*
> www.iprogrammer.com <sac...@iprogrammer.com>
> ------------------------------------------------

Reply via email to