For the record. So that other unexperienced people my benefit too π¬ List<Tuple4<Long, Long, String, String>> inputList = new ArrayList<>();
input.forEach(inputList::add); inputList.sort(new SortEventsHandlingTime()); for (Tuple4<Long, Long, String, String> in: inputList){ Op ma 7 mrt. 2022 om 03:05 schreef yidan zhao <hinobl...@gmail.com>: > Collect the elements to a list, then sort, then collect out. > > HG <hanspeter.sl...@gmail.com> δΊ2022εΉ΄3ζ3ζ₯ε¨ε 22:13ειοΌ > >> Hi, >> I have need to sort the input of the ProcesWindowFunction by one of the >> fields of the Tuple4 that is in the Iterator. >> >> Any advice as to what the best way is? >> >> static class MyProcessWindowFunction extends >> ProcessWindowFunction<Tuple4<Long, Long, String, String>, String, String, >> TimeWindow> { >> @Override >> public void process(String key, Context context, >> Iterable<Tuple4<Long, Long, String, String>> input, Collector<String> out) >> { >> Long elapsed = 0L; >> Long pHandlingTime = 0L; >> Long totalElapsed = 0L >> >> System.out.println(input.getClass()); >> >> Iterator<Tuple4<Long, Long, String, String>> etter = >> input.iterator(); >> *for (Tuple4<Long, Long, String, String> in: input){* >> transactionId = in.getField(2).toString(); >> elapsed = Long.parseLong(in.getField(1).toString()) >> - pHandlingTime; >> totalElapsed = totalElapsed + elapsed; >> pHandlingTime = Long.parseLong(in.getField(1).toString()) >> >> out.collect("Key : " + key + " Window : " + >> context.window() + " transactionId : " + transactionId + " elapsed : " + >> elapsed.toString() + " max handling time : " + h.toString() + " >> totalElapsed " + totalElapsed); >> } >> } >> } >> >> >> Op do 3 mrt. 2022 om 15:12 schreef HG <hanspeter.sl...@gmail.com>: >> >>> Hi, >>> I have need to sort the input of the ProcesWindowFunction by one of the >>> fields of the Tuple4 that is in the Iterator. >>> >>> static class MyProcessWindowFunction extends >>> ProcessWindowFunction<Tuple4<Long, Long, String, String>, String, String, >>> TimeWindow> { >>> @Override >>> public void process(String key, Context context, >>> Iterable<Tuple4<Long, Long, String, String>> input, Collector<String> out) >>> { >>> Long elapsed = 0L; >>> Long pHandlingTime = 0L; >>> Long totalElapsed = 0L >>> >>> System.out.println(input.getClass()); >>> >>> Iterator<Tuple4<Long, Long, String, String>> etter = >>> input.iterator(); >>> *for (Tuple4<Long, Long, String, String> in: input){* >>> transactionId = in.getField(2).toString(); >>> elapsed = >>> Long.parseLong(in.getField(1).toString()) - pHandlingTime; >>> totalElapsed = totalElapsed + elapsed; >>> pHandlingTime = Long.parseLong(in.getField(1).toString()) >>> >>> out.collect("Key : " + key + " Window : " + >>> context.window() + " transactionId : " + transactionId + " elapsed : " + >>> elapsed.toString() + " max handling time : " + h.toString() + " >>> totalElapsed " + totalElapsed); >>> } >>> } >>> } >>> >>