The input topic contains 60 partitions and data is distributed well across
different partitions on different keys. While consumption, I am doing some
filtering and writing only single key data.
Output would be something of the form:- Machine 1
2017-06-13 16:49:10 INFO LICountClickImprMR2:116 - licount
k=P:LIS:1236667:2017_06_13:I,v=651
2017-06-13 16:49:30 INFO LICountClickImprMR2:116 - licount
k=P:LIS:1236667:2017_06_13:I,v=652
Machine 2
2017-06-13 16:49:10 INFO LICountClickImprMR2:116 - licount
k=P:LIS:1236667:2017_06_13:I,v=1
2017-06-13 16:49:30 INFO LICountClickImprMR2:116 - licount
k=P:LIS:1236667:2017_06_13:I,v=2
I am sharing a snippet of code,
private KTable<String, Integer> extractLICount(KStream<Windowed<String>,
AdLog> joinedImprLogs) {
KTable<String, Integer> liCount = joinedImprLogs.flatMap((key, value)
-> {
List<KeyValue<String, Integer>> l = new ArrayList<>();
if (value == null) {
return l;
}
String date = new SimpleDateFormat("yyyy_MM_dd").format(new
Date(key.window().end()));
// Lineitemids
if (value != null && value.getAdLogType() == 3) {
// log.info("Invalid data: " + value);
return l;
}
if (value.getAdLogType() == 2) {
long lineitemid = value.getAdClickLog().getItmClmbLId();
if (lineitemid == TARGETED_LI) {
String liKey = String.format("P:LIS:%s:%s:C", lineitemid, date);
l.add(new KeyValue<>(liKey, 1));
}
return l;
} else if (value.getAdLogType() == 1){
long[] lineitemids = value.getAdImprLog().getItmClmbLIds();
if (value.getAdImprLog().isVisible()) {
for (int i = 0; i < lineitemids.length; i++) {
long li = lineitemids[i];
if (li == TARGETED_LI) {
// log.info("valid impression ids= " +
value.getAdImprLog().toString());
String liKey = String.format("P:LIS:%s:%s:I", li, date);
l.add(new KeyValue<>(liKey, 1));
}
}
}
return l;
}
return l;
}).groupBy((k, v) -> k, Serdes.String(), Serdes.Integer())
.reduce((value1, value2) -> value1 + value2, LINE_ITEM_COUNT_STORE);
return liCount;
}
On Wed, Jun 14, 2017 at 10:55 AM, Sameer Kumar <[email protected]>
wrote:
> The input topic contains 60 partitions and data is distributed well across
> different partitions on different keys. While consumption, I am doing some
> filtering and writing only single key data.
>
> Output would be something of the form:- Machine 1
>
> 2017-06-13 16:49:10 INFO LICountClickImprMR2:116 - licount
> k=P:LIS:1236667:2017_06_13:I,v=651
> 2017-06-13 16:49:30 INFO LICountClickImprMR2:116 - licount
> k=P:LIS:1236667:2017_06_13:I,v=652
>
> Machine 2
> 2017-06-13 16:49:10 INFO LICountClickImprMR2:116 - licount
> k=P:LIS:1236667:2017_06_13:I,v=1
> 2017-06-13 16:49:30 INFO LICountClickImprMR2:116 - licount
> k=P:LIS:1236667:2017_06_13:I,v=2
>
> I am sharing a snippet of code,
>
> private KTable<String, Integer> extractLICount(KStream<Windowed<String>,
> AdLog> joinedImprLogs) {
> KTable<String, Integer> liCount = joinedImprLogs.flatMap((key, value)
> -> {
> List<KeyValue<String, Integer>> l = new ArrayList<>();
> if (value == null) {
> return l;
> }
> String date = new SimpleDateFormat("yyyy_MM_dd").format(new
> Date(key.window().end()));
> // Lineitemids
> if (value != null && value.getAdLogType() == 3) {
> // log.info("Invalid data: " + value);
> return l;
> }
> if (value.getAdLogType() == 2) {
> long lineitemid = value.getAdClickLog().getItmClmbLId();
> if (lineitemid == TARGETED_LI) {
> String liKey = String.format("P:LIS:%s:%s:C", lineitemid, date);
> l.add(new KeyValue<>(liKey, 1));
> }
> return l;
> } else if (value.getAdLogType() == 1){
>
> long[] lineitemids = value.getAdImprLog().getItmClmbLIds();
> if (value.getAdImprLog().isVisible()) {
> for (int i = 0; i < lineitemids.length; i++) {
> long li = lineitemids[i];
> if (li == TARGETED_LI) {
> // log.info("valid impression ids= " +
> value.getAdImprLog().toString());
> String liKey = String.format("P:LIS:%s:%s:I", li, date);
> l.add(new KeyValue<>(liKey, 1));
> }
> }
> }
> return l;
> }
> return l;
> }).groupBy((k, v) -> k, Serdes.String(), Serdes.Integer())
> .reduce((value1, value2) -> value1 + value2,
> LINE_ITEM_COUNT_STORE);
> return liCount;
> }
>
> -Sameer.
>
> On Tue, Jun 13, 2017 at 10:35 PM, Matthias J. Sax <[email protected]>
> wrote:
>
>> Sameer,
>>
>> if you write a single key, all your input data should be in a single
>> partition. As Streams scales out via partitions, you cannot have a
>> second instance as data from one partition is never split. Thus, all
>> data will go to one instance while the second instance should be idle.
>>
>> So what I don't understand is, why you see that machine 2 output a
>> counter of 1 -- it should not output anything. Maybe you can give some
>> more details about your setup?
>>
>> -Matthias
>>
>> On 6/13/17 5:00 AM, Sameer Kumar wrote:
>> > Hi,
>> >
>> > I witnessed a strange behaviour in KafkaStreams, need help in
>> understanding
>> > the same.
>> >
>> > I created an application for aggregating clicks per user, I want to
>> process
>> > it only for 1 user( i was writing only a single key).
>> > When I ran application on one machine, it was running fine.Now, to
>> > loadbalance it , I started another node.
>> >
>> > My expectation was that the counter should be in sync, i.e. if output
>> from
>> > one machine is key, 100 and another machine should read this and the
>> value
>> > should be key, 101.
>> > But, that didnt happened. Instead, on machine 2, the counter started
>> with 1.
>> >
>> >
>> > Regards,
>> > -Sameer.
>> >
>>
>>
>