And the comparator function The order of the return 1,0,-1 is relevant . In this order -1,0,1 it will sort descending I discovered.
public static class SortEventsHandlingTime implements Comparator<Tuple4<Long, Long, String, String>> { // Let's compare 2 Tuple4 objects public int compare(Tuple4<Long, Long, String, String> o1, Tuple4<Long, Long, String, String> o2) { if (Long.parseLong(o1.getField(0).toString()) > Long.parseLong(o2.getField(0).toString())) { return 1; } else if (Long.parseLong(o1.getField(0).toString()) == Long.parseLong(o2.getField(0).toString())){ return 0; } else { return -1; } } } Op ma 7 mrt. 2022 om 03:05 schreef yidan zhao <hinobl...@gmail.com>: > Collect the elements to a list, then sort, then collect out. > > HG <hanspeter.sl...@gmail.com> 于2022年3月3日周四 22:13写道: > >> Hi, >> I have need to sort the input of the ProcesWindowFunction by one of the >> fields of the Tuple4 that is in the Iterator. >> >> Any advice as to what the best way is? >> >> static class MyProcessWindowFunction extends >> ProcessWindowFunction<Tuple4<Long, Long, String, String>, String, String, >> TimeWindow> { >> @Override >> public void process(String key, Context context, >> Iterable<Tuple4<Long, Long, String, String>> input, Collector<String> out) >> { >> Long elapsed = 0L; >> Long pHandlingTime = 0L; >> Long totalElapsed = 0L >> >> System.out.println(input.getClass()); >> >> Iterator<Tuple4<Long, Long, String, String>> etter = >> input.iterator(); >> *for (Tuple4<Long, Long, String, String> in: input){* >> transactionId = in.getField(2).toString(); >> elapsed = Long.parseLong(in.getField(1).toString()) >> - pHandlingTime; >> totalElapsed = totalElapsed + elapsed; >> pHandlingTime = Long.parseLong(in.getField(1).toString()) >> >> out.collect("Key : " + key + " Window : " + >> context.window() + " transactionId : " + transactionId + " elapsed : " + >> elapsed.toString() + " max handling time : " + h.toString() + " >> totalElapsed " + totalElapsed); >> } >> } >> } >> >> >> Op do 3 mrt. 2022 om 15:12 schreef HG <hanspeter.sl...@gmail.com>: >> >>> Hi, >>> I have need to sort the input of the ProcesWindowFunction by one of the >>> fields of the Tuple4 that is in the Iterator. >>> >>> static class MyProcessWindowFunction extends >>> ProcessWindowFunction<Tuple4<Long, Long, String, String>, String, String, >>> TimeWindow> { >>> @Override >>> public void process(String key, Context context, >>> Iterable<Tuple4<Long, Long, String, String>> input, Collector<String> out) >>> { >>> Long elapsed = 0L; >>> Long pHandlingTime = 0L; >>> Long totalElapsed = 0L >>> >>> System.out.println(input.getClass()); >>> >>> Iterator<Tuple4<Long, Long, String, String>> etter = >>> input.iterator(); >>> *for (Tuple4<Long, Long, String, String> in: input){* >>> transactionId = in.getField(2).toString(); >>> elapsed = >>> Long.parseLong(in.getField(1).toString()) - pHandlingTime; >>> totalElapsed = totalElapsed + elapsed; >>> pHandlingTime = Long.parseLong(in.getField(1).toString()) >>> >>> out.collect("Key : " + key + " Window : " + >>> context.window() + " transactionId : " + transactionId + " elapsed : " + >>> elapsed.toString() + " max handling time : " + h.toString() + " >>> totalElapsed " + totalElapsed); >>> } >>> } >>> } >>> >>