Just wanted to add, that there is always the potential about late
arriving records, and thus, ordering by timestamp will never be perfect...

You should rather try to design you application in a way such that it
can handle out-of-order data gracefully and try to avoid the necessity
of ordering records by timestamp.


-Matthias

On 3/1/17 7:31 AM, Damian Guy wrote:
> You could implement your own based sorting algorithm using the low level
> processor api, i.e, you have a processor that keeps a sorted list of
> records and then, periodically, perhaps on punctuate, it emits the sorted
> messages downstream. You could do something like:
> 
>     builder.stream("topic").transform(new TransformerSupplier() {
> 
>         @Override
>         public Transformer get() {
>             return new TheTransformer();
>     }
> }).groupByKey().reduce(..);
> 
> Where the TheTransformer might look something like:
> 
> private static class TheTransformer<K, V, R> implements Transformer<K, V, R> {
>     private ProcessorContext context;
>     private TreeMap<K, V> sorted = new TreeMap<>();
> 
>     @Override
>     public void init(final ProcessorContext context) {
>         this.context = context;
>         context.schedule(1000); // punctuate every 1 second of streams-time
>     }
> 
>     @Override
>     public R transform(final K key, final V value) {
>         // do stuff
>         sorted.put(key, value);
>     }
> 
>     @Override
>     public R punctuate(final long timestamp) {
>         for (final Map.Entry<K, V> kvEntry : sorted.entrySet()) {
>             context.forward(kvEntry.getKey(), kvEntry.getValue());
>         }
>         sorted.clear();
>         return null;
>     }
> 
>     @Override
>     public void close() {
> 
>     }
> }
> 
> 
> 
> 
> 
> On Wed, 1 Mar 2017 at 13:04 Ofir Sharony <ofir.shar...@myheritage.com>
> wrote:
> 
>> Is there any way to sort grouped records before sending them to the
>> reducer?
>>
>> *Ofir Sharony*
>> BackEnd Tech Lead
>>
>> Mobile: +972-54-7560277 <+972%2054-756-0277> | ofir.shar...@myheritage.com
>> | www.myheritage.com
>> MyHeritage Ltd., 3 Ariel Sharon St., Or Yehuda 60250, Israel
>>
>> <http://www.myheritage.com/>
>>
>> <https://www.facebook.com/myheritage>
>> <https://twitter.com/myheritage>         <http://blog.myheritage.com/>
>>     <https://www.youtube.com/user/MyHeritageLtd>
>>
>>
>> On Wed, Mar 1, 2017 at 3:03 PM, Damian Guy <damian....@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> The TimestampExtractor won't effect the order the records arrive in. It
>>> just provides a way for developers to use a timestamp other than the
>>> default.
>>>
>>> Thanks,
>>> Damian
>>>
>>> On Wed, 1 Mar 2017 at 12:34 Ofir Sharony <ofir.shar...@myheritage.com>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> I have the following code on a stream:
>>>>
>>>> .selectKey(...)
>>>> .groupByKey(...)
>>>> .reduce(...)
>>>>
>>>> The records arrived to the Reducer function in the same order they were
>>>> consumed from Kafka
>>>> I have implemented a TimestampExtractor, extracting the wanted
>> timestamp
>>>> from each record, but unfortunately this didn't have any effect on the
>>>> order the messages were received in the Reducer.
>>>>
>>>> Any thoughts on that?
>>>> Thanks,
>>>>
>>>> *Ofir Sharony*
>>>> BackEnd Tech Lead
>>>>
>>>> Mobile: +972-54-7560277 <+972%2054-756-0277> <+972%2054-756-0277>
>> <+972%2054-756-0277> |
>>>> ofir.shar...@myheritage.com
>>>> | www.myheritage.com
>>>> MyHeritage Ltd., 3 Ariel Sharon St., Or Yehuda 60250, Israel
>>>>
>>>> <http://www.myheritage.com/>
>>>>
>>>> <https://www.facebook.com/myheritage>
>>>> <https://twitter.com/myheritage>         <http://blog.myheritage.com/>
>>>>     <https://www.youtube.com/user/MyHeritageLtd>
>>>>
>>>
>>
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to