Logged the internal windows information:

Window{start=1528043030000, end=1528043040000} key=t6  1
Window{start=1528043040000, end=1528043050000} key=t1  2
Window{start=1528043040000, end=1528043050000} key=t7  3
Window{start=1528043040000, end=1528043050000} key=t5  4
Window{start=1528043040000, end=1528043050000} key=t5  4,5
Window{start=1528043050000, end=1528043060000} key=t6  6
Window{start=1528043050000, end=1528043060000} key=t6  6,7
Window{start=1528043050000, end=1528043060000} key=t4  8
Window{start=1528043060000, end=1528043070000} key=t6  9
Window{start=1528043060000, end=1528043070000} key=t7  10
Window{start=1528043060000, end=1528043070000} key=t6  9,11
Window{start=1528043070000, end=1528043080000} key=t5  12
Window{start=1528043070000, end=1528043080000} key=t6  13
Window{start=1528043070000, end=1528043080000} key=t4  14
Window{start=1528043070000, end=1528043080000} key=t4  14,15

....

It seems that Kafka Stream send all the  KTable changelog as output and
that's probably why there's duplicate outputs for gap-less non-overlapping
window.

Is there any way to achieve real mini-batch-like style processing semantics
using non-overlapping windows which means only the last  value will be sent
as output not all the changelogs in the windows?


On Mon, Jun 4, 2018 at 1:25 AM, EC Boost <ecboost2...@gmail.com> wrote:

> Hello Everyone,
>
> I got duplicated results using kstreams for simple  windowed aggregation.
>
> The input event format is comma seperated:  "event_id,event_type" and I
> need to aggregate them by event type.
>
> Following is the Kafka Stream processing logic:
>
> events
>       .map((k, v) -> KeyValue.pair(v.split(",")[1], v.split(",")[0]))
>       .groupByKey()
>       .windowedBy(TimeWindows.of(TimeUnit.SECONDS.toMillis(10)))
>       .aggregate(
>         ArrayList::new,
>         (type, id, eventList) -> {
>           eventList.add(id);
>           return eventList;
>         },
>         Materialized.with(stringSerde, arraySerde)
>       )
>       .toStream((k,v) -> k.key())
>       .mapValues((v)-> String.join(",", v))
>       .to("ks-debug-output", Produced.with(stringSerde, stringSerde));
>
>
> I produced the input messages using the following snippet:
>
> require "kafka"
>
> kafka = Kafka.new(["localhost:9092"], client_id: "event-producer")
>
> f = File.open("events.txt")
> f.each_line { |l|
>   puts l
>   kafka.deliver_message("#{l.strip}", topic: "ks-debug-input")
>   sleep(3)
> }
>
>
>
> Messages in events.txt is the following ( format : "event_id,event_type"
> and event_id is unique )  :
>
> Input
>
> 1,t6
> 2,t1
> 3,t7
> 4,t5
> 5,t5
> 6,t6
> 7,t6
> 8,t4
> 9,t6
> 10,t7
> 11,t6
> 12,t5
> 13,t6
> 14,t4
> 15,t4
> 16,t2
> 17,t7
> 18,t6
> 19,t3
> 20,t7
> 21,t1
> 22,t5
> 23,t5
> 24,t6
> 25,t6
> 26,t4
> 27,t4
> 28,t3
> 29,t2
> 30,t5
> 31,t1
> 32,t1
> 33,t1
> 34,t1
> 35,t2
> 36,t4
> 37,t3
> 38,t3
> 39,t6
> 40,t6
> 41,t1
> 42,t4
> 43,t4
> 44,t6
> 45,t6
> 46,t7
> 47,t7
> 48,t3
> 49,t1
> 50,t6
> 51,t1
> 52,t4
> 53,t6
> 54,t7
> 55,t1
> 56,t1
> 57,t1
> 58,t5
> 59,t6
> 60,t7
> 61,t6
> 62,t4
> 63,t5
> 64,t1
> 65,t3
> 66,t1
> 67,t3
> 68,t3
> 69,t5
> 70,t1
> 71,t6
> 72,t5
> 73,t6
> 74,t1
> 75,t7
> 76,t5
> 77,t3
> 78,t1
> 79,t4
> 80,t3
> 81,t6
> 82,t2
> 83,t6
> 84,t2
> 85,t4
> 86,t7
> 87,t4
> 88,t6
> 89,t5
> 90,t6
> 91,t4
> 92,t3
> 93,t4
> 94,t6
> 95,t2
> 96,t2
> 97,t7
> 98,t4
> 99,t3
> 100,t3
>
> <https://gist.github.com/stonegao/087fc0a06fc81177b452a651c16e81c2#output>
>
> But got the following output with duplicate event_ids between windows :
>
> Output
>
> t6    1
> t1    2
> t7    3
> t5    4
> t5    4,5
> t6    6
> t6    6,7
> t4    8
> t6    9
> t7    10
> t6    9,11
> t5    12
> t6    13
> t4    14
> t4    14,15
> t2    16
> t7    17
> t6    18
> t3    19
> t7    20
> t1    21
> t5    22
> t5    22,23
> t6    24
> t6    24,25
> t4    26
> t4    26,27
> t3    28
> t2    29
> t5    30
> t1    31
> t1    32
> t1    32,33
> t1    32,33,34
> t2    35
> t4    36
> t3    37
> t3    37,38
> t6    39
> t6    39,40
> t1    41
> t4    42
> t4    42,43
> t6    44
> t6    44,45
> t7    46
> t7    46,47
> t3    48
> t1    49
> t6    50
> t1    49,51
> t4    52
> t6    53
> t7    54
> t1    55
> t1    56
> t1    56,57
> t5    58
> t6    59
> t7    60
> t6    59,61
> t4    62
> t5    63
> t1    64
> t3    65
> t1    66
> t3    67
> t3    67,68
> t5    69
> t1    70
> t6    71
> t5    72
> t6    73
> t1    74
> t7    75
> t5    76
> t3    77
> t1    78
> t4    79
> t3    80
> t6    81
> t2    82
> t6    83
> t2    82,84
> t4    85
> t7    86
> t4    87
> t6    88
> t5    89
> t6    90
> t4    91
> t3    92
> t4    93
> t6    94
> t2    95
> t2    96
> t7    97
> t4    98
> t3    99
> t3    99,100
>
>
>
> Since I am using non-overlapping gap-less windows in kstream processing
> dsl, the correct ouput should NOT contain duplicate event ids between
> windows.  Any ideas why the duplicates ?   ( Link for the debug project:
> https://github.com/westec/ks-aggregate-debug )
>
> Appreciate for your help!
>
> Regards,
> EC
>
>
>
>
>
>

Reply via email to