Hi,
I have need to sort the input of the ProcesWindowFunction by one of the
fields of the Tuple4 that is in the Iterator.

Any advice as to what the best way is?

 static class MyProcessWindowFunction extends
ProcessWindowFunction<Tuple4<Long, Long, String, String>, String, String,
TimeWindow> {
        @Override
        public void process(String key, Context context,
Iterable<Tuple4<Long, Long, String, String>> input, Collector<String> out)
        {
            Long elapsed       = 0L;
            Long pHandlingTime = 0L;
            Long totalElapsed  = 0L

            System.out.println(input.getClass());

            Iterator<Tuple4<Long, Long, String, String>> etter =
input.iterator();
            *for (Tuple4<Long, Long, String, String> in: input){*
                transactionId = in.getField(2).toString();
                elapsed       = Long.parseLong(in.getField(1).toString()) -
pHandlingTime;
                totalElapsed  = totalElapsed + elapsed;
                pHandlingTime = Long.parseLong(in.getField(1).toString())

                out.collect("Key : " + key + " Window : " +
context.window() + "  transactionId : "  + transactionId + "  elapsed : " +
elapsed.toString() + "  max handling time : " + h.toString() + "
totalElapsed " + totalElapsed);
            }
        }
    }


Op do 3 mrt. 2022 om 15:12 schreef HG <hanspeter.sl...@gmail.com>:

> Hi,
> I have need to sort the input of the ProcesWindowFunction by one of the
> fields of the Tuple4 that is in the Iterator.
>
>  static class MyProcessWindowFunction extends
> ProcessWindowFunction<Tuple4<Long, Long, String, String>, String, String,
> TimeWindow> {
>         @Override
>         public void process(String key, Context context,
> Iterable<Tuple4<Long, Long, String, String>> input, Collector<String> out)
>         {
>             Long elapsed       = 0L;
>             Long pHandlingTime = 0L;
>             Long totalElapsed  = 0L
>
>             System.out.println(input.getClass());
>
>             Iterator<Tuple4<Long, Long, String, String>> etter =
> input.iterator();
>             *for (Tuple4<Long, Long, String, String> in: input){*
>                 transactionId = in.getField(2).toString();
>                 elapsed       = Long.parseLong(in.getField(1).toString())
> - pHandlingTime;
>                 totalElapsed  = totalElapsed + elapsed;
>                 pHandlingTime = Long.parseLong(in.getField(1).toString())
>
>                 out.collect("Key : " + key + " Window : " +
> context.window() + "  transactionId : "  + transactionId + "  elapsed : " +
> elapsed.toString() + "  max handling time : " + h.toString() + "
> totalElapsed " + totalElapsed);
>             }
>         }
>     }
>

Reply via email to