Hello andrey, Thanks for the help.
I am trying to implement your above given code code sourceStream .setParallelism(4) .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<>(…) {…}) .windowAll(TumblingEventTimeWindows.of(Time...)) .process(new OrderTheRecords())) but I am facing issues to write *OrderTheRecords *class as I am new to this framework can you please suggest me what is optimal way to sort the records? I have implemented below ProcessWindowFunction code import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; public class OrderTheRecords extends ProcessWindowFunction<Oplog, Oplog, Long, TimeWindow> { @Override public void process(Long s, Context context, Iterable<Oplog> iterable, Collector<Oplog> collector) throws Exception { for (Oplog oplog : iterable) { collector.collect(oplog); } } } public class Oplog { private OplogTimestamp ts; private String op; private BasicDBObject o; } here *ts* represents even timestamp. ----------------------------------------------- *Amol Suryawanshi* Java Developer am...@iprogrammer.com *iProgrammer Solutions Pvt. Ltd.* *Office 103, 104, 1st Floor Pride Portal,Shivaji Housing Society, Bahiratwadi,Near Hotel JW Marriott, Off Senapati Bapat Road, Pune - 411016, MH, INDIA.**Phone: +91 9689077510 | Skype: amols_iprogrammer* www.iprogrammer.com <sac...@iprogrammer.com> ------------------------------------------------ On Wed, Jun 20, 2018 at 6:51 PM, Andrey Zagrebin <and...@data-artisans.com> wrote: > Hi Amol, > > In above code also it will sort the records in specific time window only. > > > All windows will be emitted as watermark passes the end of the window. The > watermark only increases. So the non-overlapping windows should be also > sorted by time and as a consequence the records across windows either, if > this is the concern about sorting records only in a specific time window. > > 1. How should I create N consumers dynamically based on partition count? > > > sourceStream.setParallelism(N), each Flink consumer parallel subtask will > serve one Kafka partition. > > 2. Is number of consumers dynamically grows as number of partition > increased in middle of execution? > > > Dynamically added Kafka partitions will be eventually discovered by Flink > consumers (flink.partition-discovery.interval-millis) and picked up by > some consumer. Flink job has be rescaled separately. > > Currently parallelism of Flink operator cannot be changed while the job is > running. The way to go now is to use savepoint/checkpoint, stop the job and > start the new one with changed parallelism from the > previous savepoint/checkpoint (see Flink docs). New job will pick up from > partition offsets of previous job. > > 3. How to create partition specific kafka consumer in flink? > > > The partition-consumer assignment is now implementation specific for Flink. > There is an open issue for custom assignment https://issues.apac > he.org/jira/browse/FLINK-8570 e.g. if you need specific locality of > keys/consumers. > > I would simply suggest to assign some key to each record and let all > records for particular key to go into the same Kafka partition. On the > Flink side if a corresponding keyBy() is applied to the Kafka source, all > the records for this particular key will go to the same parallel subtask of > subsequent operator, sorted by time if they were originally sorted in its > Kafka partition. This is more scalable approach than total global ordering. > > Cheers, > Andrey > > On 20 Jun 2018, at 13:17, Amol S - iProgrammer <am...@iprogrammer.com> > wrote: > > Hello Andrey, > > In above code also it will sort the records in specific time window only. > Anyways we agreed to create N number of partitions with N number of > consumers based on some key as order is maintained per kafka partition. > > I have some questions about this. > > 1. How should I create N consumers dynamically based on partition count? > 2. Is number of consumers dynamically grows as number of partition > increased in middle of execution? > 3. How to create partition specific kafka consumer in flink? > > ----------------------------------------------- > *Amol Suryawanshi* > Java Developer > am...@iprogrammer.com > > > *iProgrammer Solutions Pvt. Ltd.* > > > > *Office 103, 104, 1st Floor Pride Portal,Shivaji Housing Society, > Bahiratwadi,Near Hotel JW Marriott, Off Senapati Bapat Road, Pune - 411016, > MH, INDIA.**Phone: +91 9689077510 | Skype: amols_iprogrammer* > www.iprogrammer.com <sac...@iprogrammer.com> > ------------------------------------------------ > > On Wed, Jun 20, 2018 at 2:38 PM, Andrey Zagrebin <and...@data-artisans.com > > > wrote: > > Hi, > > Good point, sorry for confusion, BoundedOutOfOrdernessTimestampExtractor > of course does not buffer records, you need to apply windowing > (e.g. TumblingEventTimeWindows) for that and then sort the window output > by time and emit records in sorted order. > > You can also use windowAll which already does keyBy((record) -> 0) and > makes the stream non-parallel: > > sourceStream > .setParallelism(4) > .assignTimestampsAndWatermarks(new > BoundedOutOfOrdernessTimestampExtractor<>(…) {…}) > .windowAll(TumblingEventTimeWindows.of(Time...)) > .process(new OrderTheRecords())) > > Cheers, > Andrey > > On 20 Jun 2018, at 10:14, sihua zhou <summerle...@163.com> wrote: > > > > Hi, > > I think a global ordering is a bit impractical on production, but in > theroy, you still can do that. You need to > > - Firstly fix the operate's parallelism to 1(except the source node). > - If you want to sort the records within a bouned time, then you can > keyBy() a constant and window it, buffer the records in the state and sort > the records when the window is triggered, the code maybe as follows. > {code} > sourceStream > .setParallelism(4) > .assignTimestampsAndWatermarks( > new BoundedOutOfOrdernessTimestamp > Extractor<Document>(Time.milliseconds(3500)) > { > @Override > public long extractTimestamp(Event element) { > Map timeStamp = (Map) event.get("ts”); > return (long) timeStamp.get("value"); > } > }) > .keyBy((record) -> 0)// keyby the constant value > .window(...) > .process(new OrderTheRecords())) > .setParallelism(1); > {code} > > - If you want to sort the records truly globally(non-window), then you > could keyBy a constant, store the records in the state and sort the records > in the process() function for every incoming record. And if you want a > perfect correct output, then maybe you need to do retraction (because every > incoming records may change the globally order), the code maybe as follows > {code} > sourceStream > .setParallelism(4) > .keyBy((record) -> 0) // keyby the constant value > .process(new OrderTheRecords())) > .setParallelism(1); > {code} > > > In all the case, you need to fix the parallelism of the OrderTheRecord > operate to 1, which makes your job non-scale-able and becomes the > bottleneck. So a global ordering maybe not practical on production (but if > the source's TPS is very low, then maybe practical). > > Best, Sihua > > On 06/20/2018 15:36,Amol S - iProgrammer<am...@iprogrammer.com> > <am...@iprogrammer.com> wrote: > > Hello Andrey, > > Thanks for your quick response. I have tried with your above code but it > didn't suit's my requirement. I need global ordering of my records by using > multiple kafka partitions. Please suggest me any workaround for this. as > mentioned in this > <https://cwiki.apache.org/confluence/display/FLINK/Time+ > and+Order+in+Streams> > link is it possible to buffer data for some amount of time and then perform > sort on this or any other way out there? > > ----------------------------------------------- > *Amol Suryawanshi* > Java Developer > am...@iprogrammer.com > > > *iProgrammer Solutions Pvt. Ltd.* > > > > *Office 103, 104, 1st Floor Pride Portal,Shivaji Housing Society, > Bahiratwadi,Near Hotel JW Marriott, Off Senapati Bapat Road, Pune - 411016, > MH, INDIA.**Phone: +91 9689077510 | Skype: amols_iprogrammer* > www.iprogrammer.com <sac...@iprogrammer.com> > ------------------------------------------------ > > On Tue, Jun 19, 2018 at 10:19 PM, Andrey Zagrebin < > and...@data-artisans.com> > wrote: > > Hi Amol, > > I think you could try (based on your stack overflow code) > org.apache.flink.streaming.api.functions.timestamps. > BoundedOutOfOrdernessTimestampExtractor > like this: > > DataStream<Document> streamSource = env > .addSource(kafkaConsumer) > .setParallelism(4) > .assignTimestampsAndWatermarks( > new BoundedOutOfOrdernessTimestampExtractor<Document>(Time. > milliseconds(3500)) > { > @Override > public long extractTimestamp(Event element) { > Map timeStamp = (Map) event.get("ts”); > return (long) timeStamp.get("value"); > } > }); > > In general, if records are sorted by anything in a Kafka partition, > parallel subtask of Flink Kafka source will consume these records and push > to user operators in the same order. There is maximum one consuming subtask > per Kafka partition but several partitions might be served by one subtask. > It means that there are the same guarantees as in Kafka: ordering per > partition but not across them, including no global ordering. > > The case of global and per window ordering is already described by Sihua. > The global ordering might be impractical in case of distributed system. > > If a subtask of your Flink operator consumes from several partitions or > there is no ordering at all, you can try the above approach with > BoundedOutOfOrdernessTimestampExtractor to get approximate ordering > across these partitions per key or all records. It is similar to ordering > within a window. It means there could still be late records coming after > out of orderness period of time which can break the ordering. This operator > buffers records in state to maintain the order but only for out of > orderness period of time which also increases latency. > > Cheers, > Andrey > > On 19 Jun 2018, at 14:12, sihua zhou <summerle...@163.com> wrote: > > > > Hi Amol, > > > I'm not sure whether this is impossible, especially when you need to > > operate the record in multi parallelism. > > > > IMO, in theroy, we can only get a ordered stream when there is a single > > partition of kafka and operate it with a single parallelism in flink. Even > in this case, if you only want to order the records in a window, than you > need to store the records in the state, and order them when the window is > triggered. But if you want to order the records with a single > `keyBy()`(non-window), I think that's maybe impossible in practice, because > you need to store the all the incoming records and order the all data for > every incoming records, also you need to send retracted message for the > previous result(because every incoming record might change the global order > of the records). > > > > Best, Sihua > On 06/19/2018 19:19,Amol S - iProgrammer<am...@iprogrammer.com> wrote: > Hi, > > I have used flink streaming API in my application where the source of > streaming is kafka. My kafka producer will publish data in ascending > > order > > of time in different partitions of kafka and consumer will read data from > these partitions. However some kafka partitions may be slow due to some > operation and produce late results. Is there any way to maintain order in > this stream though the data arrive out of order. I have tried > BoundedOutOfOrdernessTimestampExtractor but it didn't served the > > purpose. > > While digging this problem I came across your documentation (URL: > https://cwiki.apache.org/confluence/display/FLINK/Time+ > > and+Order+in+Streams) > > and tried to implement this but it didnt worked. I also tried with Table > API order by but it seems you not support orderBy in flink 1.5 version. > Please suggest me any workaround for this. > > I have raised same concern on stack overflow > > https://stackoverflow.com/questions/50904615/ordering- > > of-streams-while-reading-data-from-multiple-kafka-partitions > > > Thanks, > > ----------------------------------------------- > *Amol Suryawanshi* > Java Developer > am...@iprogrammer.com > > > *iProgrammer Solutions Pvt. Ltd.* > > > > *Office 103, 104, 1st Floor Pride Portal,Shivaji Housing Society, > Bahiratwadi,Near Hotel JW Marriott, Off Senapati Bapat Road, Pune - > > 411016, > > MH, INDIA.**Phone: +91 9689077510 | Skype: amols_iprogrammer* > www.iprogrammer.com <sac...@iprogrammer.com> > ------------------------------------------------ > > >