Oh, my bad.
Updating the third predicate in `branch()` may not even be needed.
You could simply do:
KStream<String, CallRecord>[] branches = allRecords
.branch(
(imsi, callRecord) -> "VOICE".equalsIgnoreCase(callR
ecord.getCallCommType()),
(imsi, callRecord) -> "DATA".equalsIgnoreCase(callRe
cord.getCallCommType())
// Any callRecords that aren't matching any of the two
predicates above will be dropped.
);
This would give you two branched streams instead of three:
KStream<String, CallRecord> voiceRecords = branches[0];
KStream<String, CallRecord> dataRecords = branches[1];
// No third branched stream like before.
Then, to count "everything" (VOICE + DATA + everything else), simply reuse
the original `allRecords` stream.
On Fri, Sep 9, 2016 at 2:23 PM, Michael Noll <[email protected]> wrote:
> Ara,
>
> you have shared this code snippet:
>
> > allRecords.branch(
> > (imsi, callRecord) -> "VOICE".equalsIgnoreCase(callR
> ecord.getCallCommType()),
> > (imsi, callRecord) -> "DATA".equalsIgnoreCase(callRe
> cord.getCallCommType()),
> > (imsi, callRecord) -> true
> > );
>
> The branch() operation partitions the allRecords KStream into three
> disjoint streams.
>
> I'd suggest the following.
>
> First, update the third predicate in your `branch()` step to be "everything
> but VOICE and DATA", i.e. the remainder of allRecords once VOICE and DATA
> records are removed:
>
>
> KStream<String, CallRecord>[] branches = allRecords
> .branch(
> (imsi, callRecord) -> "VOICE".equalsIgnoreCase(callR
> ecord.getCallCommType()),
> (imsi, callRecord) -> "DATA".equalsIgnoreCase(callRe
> cord.getCallCommType()),
> (imsi, callRecord) -> !(callRecord.getCallCommType().
> equalsIgnoreCase("VOICE") || callRecord.getCallCommType().e
> qualsIgnoreCase("DATA"))
> );
>
> This would give you:
>
> KStream<String, CallRecord> voiceRecords = branches[0];
> KStream<String, CallRecord> dataRecords = branches[1];
> KStream<String, CallRecord> recordsThatAreNeitherVoiceNorData =
> branches[2];
>
> Then, to count "everything" (VOICE + DATA + everything else), simply
> reuse the original `allRecords` stream.
>
> -Michael
>
>
>
>
>
> On Thu, Sep 8, 2016 at 10:20 PM, Ara Ebrahimi <[email protected]
> > wrote:
>
>> Let’s say I have this:
>>
>>
>> KStream<String, CallRecord>[] branches = allRecords
>> .branch(
>> (imsi, callRecord) -> "VOICE".equalsIgnoreCase(callR
>> ecord.getCallCommType()),
>> (imsi, callRecord) -> "DATA".equalsIgnoreCase(callRe
>> cord.getCallCommType()),
>> (imsi, callRecord) -> true
>> );
>> KStream<String, CallRecord> callRecords = branches[0];
>> KStream<String, CallRecord> dataRecords = branches[1];
>> KStream<String, CallRecord> callRecordCounter = branches[2];
>>
>> callRecordCounter
>> .map((imsi, callRecord) -> new KeyValue<>("", ""))
>> .countByKey(
>> UnlimitedWindows.of("counter-window"),
>> stringSerde
>> )
>> .print();
>>
>> Here I has 3 branches. Branch 0 is triggered if data is VOICE, branch 1
>> if data is DATA. Branch 2 is supposed to get triggered regardless of type
>> all the type so that then I can count stuff for a time window. BUT the
>> problem is branch is implemented like this:
>>
>> private class KStreamBranchProcessor extends AbstractProcessor<K, V> {
>> @Override
>> public void process(K key, V value) {
>> for (int i = 0; i < predicates.length; i++) {
>> if (predicates[i].test(key, value)) {
>> // use forward with childIndex here and then break the
>> loop
>> // so that no record is going to be piped to multiple
>> streams
>> context().forward(key, value, i);
>> break;
>> }
>> }
>> }
>> }
>>
>> Note the break. So the counter branch is never reached. I’d like to
>> change the behavior of branch so that all predicates are checked and no
>> break happens, in say a branchAll() method. What’s the easiest way to this
>> functionality to the DSL? I tried process() but it doesn’t return KStream.
>>
>> Ara.
>>
>>
>>
>>
>> ________________________________
>>
>> This message is for the designated recipient only and may contain
>> privileged, proprietary, or otherwise confidential information. If you have
>> received it in error, please notify the sender immediately and delete the
>> original. Any other use of the e-mail by you is prohibited. Thank you in
>> advance for your cooperation.
>>
>> ________________________________
>>
>
>
>