Hi,


I think a global ordering is a bit impractical on production, but in theroy, 
you still can do that. You need to


- Firstly fix the operate's parallelism to 1(except the source node).
- If you want to sort the records within a bouned time, then you can keyBy() a 
constant and window it, buffer the records in the state and sort the records 
when the window is triggered, the code maybe as follows.
    {code}
        sourceStream
            .setParallelism(4)
            .assignTimestampsAndWatermarks(
                new 
BoundedOutOfOrdernessTimestampExtractor<Document>(Time.milliseconds(3500))
                {
                    @Override
                    public long extractTimestamp(Event element) {
                        Map timeStamp = (Map) event.get("ts”);
                        return (long) timeStamp.get("value");
                    }
             })
            .keyBy((record) -> 0)// keyby the constant value
            .window(...)
            .process(new OrderTheRecords()))
            .setParallelism(1);
    {code}


- If you want to sort the records truly globally(non-window), then you could 
keyBy a constant, store the records in the state and sort the records in the 
process() function for every incoming record. And if you want a perfect correct 
output, then maybe you need to do retraction (because every incoming records 
may change the globally order), the code maybe as follows
    {code}
        sourceStream
            .setParallelism(4)
            .keyBy((record) -> 0) // keyby the constant value
            .process(new OrderTheRecords()))
            .setParallelism(1);
    {code}




In all the case, you need to fix the parallelism of the OrderTheRecord operate 
to 1, which makes your job non-scale-able and becomes the bottleneck. So a 
global ordering maybe not practical on production (but if the source's TPS is 
very low, then maybe practical).


Best, Sihua


On 06/20/2018 15:36,Amol S - iProgrammer<am...@iprogrammer.com> wrote:
Hello Andrey,

Thanks for your quick response. I have tried with your above code but it
didn't suit's my requirement. I need global ordering of my records by using
multiple kafka partitions. Please suggest me any workaround for this. as
mentioned in this
<https://cwiki.apache.org/confluence/display/FLINK/Time+and+Order+in+Streams>
link is it possible to buffer data for some amount of time and then perform
sort on this or any other way out there?

-----------------------------------------------
*Amol Suryawanshi*
Java Developer
am...@iprogrammer.com


*iProgrammer Solutions Pvt. Ltd.*



*Office 103, 104, 1st Floor Pride Portal,Shivaji Housing Society,
Bahiratwadi,Near Hotel JW Marriott, Off Senapati Bapat Road, Pune - 411016,
MH, INDIA.**Phone: +91 9689077510 | Skype: amols_iprogrammer*
www.iprogrammer.com <sac...@iprogrammer.com>
------------------------------------------------

On Tue, Jun 19, 2018 at 10:19 PM, Andrey Zagrebin <and...@data-artisans.com>
wrote:

Hi Amol,

I think you could try (based on your stack overflow code)
org.apache.flink.streaming.api.functions.timestamps.
BoundedOutOfOrdernessTimestampExtractor
like this:

DataStream<Document> streamSource = env
.addSource(kafkaConsumer)
.setParallelism(4)
.assignTimestampsAndWatermarks(
new BoundedOutOfOrdernessTimestampExtractor<Document>(Time.milliseconds(3500))
{
@Override
public long extractTimestamp(Event element) {
Map timeStamp = (Map) event.get("ts”);
return (long) timeStamp.get("value");
}
});

In general, if records are sorted by anything in a Kafka partition,
parallel subtask of Flink Kafka source will consume these records and push
to user operators in the same order. There is maximum one consuming subtask
per Kafka partition but several partitions might be served by one subtask.
It means that there are the same guarantees as in Kafka: ordering per
partition but not across them, including no global ordering.

The case of global and per window ordering is already described by Sihua.
The global ordering might be impractical in case of distributed system.

If a subtask of your Flink operator consumes from several partitions or
there is no ordering at all, you can try the above approach with
BoundedOutOfOrdernessTimestampExtractor to get approximate ordering
across these partitions per key or all records. It is similar to ordering
within a window. It means there could still be late records coming after
out of orderness period of time which can break the ordering. This operator
buffers records in state to maintain the order but only for out of
orderness period of time which also increases latency.

Cheers,
Andrey

On 19 Jun 2018, at 14:12, sihua zhou <summerle...@163.com> wrote:



Hi Amol,


I'm not sure whether this is impossible, especially when you need to
operate the record in multi parallelism.


IMO, in theroy, we can only get a ordered stream when there is a single
partition of kafka and operate it with a single parallelism in flink. Even
in this case, if you only want to order the records in a window, than you
need to store the records in the state, and order them when the window is
triggered. But if you want to order the records with a single
`keyBy()`(non-window), I think that's maybe impossible in practice, because
you need to store the all the incoming records and order the all data for
every incoming records, also you need to send retracted message for the
previous result(because every incoming record might change the global order
of the records).


Best, Sihua
On 06/19/2018 19:19,Amol S - iProgrammer<am...@iprogrammer.com> wrote:
Hi,

I have used flink streaming API in my application where the source of
streaming is kafka. My kafka producer will publish data in ascending
order
of time in different partitions of kafka and consumer will read data from
these partitions. However some kafka partitions may be slow due to some
operation and produce late results. Is there any way to maintain order in
this stream though the data arrive out of order. I have tried
BoundedOutOfOrdernessTimestampExtractor but it didn't served the
purpose.
While digging this problem I came across your documentation (URL:
https://cwiki.apache.org/confluence/display/FLINK/Time+
and+Order+in+Streams)
and tried to implement this but it didnt worked. I also tried with Table
API order by but it seems you not support orderBy in flink 1.5 version.
Please suggest me any workaround for this.

I have raised same concern on stack overflow

https://stackoverflow.com/questions/50904615/ordering-
of-streams-while-reading-data-from-multiple-kafka-partitions

Thanks,

-----------------------------------------------
*Amol Suryawanshi*
Java Developer
am...@iprogrammer.com


*iProgrammer Solutions Pvt. Ltd.*



*Office 103, 104, 1st Floor Pride Portal,Shivaji Housing Society,
Bahiratwadi,Near Hotel JW Marriott, Off Senapati Bapat Road, Pune -
411016,
MH, INDIA.**Phone: +91 9689077510 | Skype: amols_iprogrammer*
www.iprogrammer.com <sac...@iprogrammer.com>
------------------------------------------------


Reply via email to