Oh, my bad. Updating the third predicate in `branch()` may not even be needed.
You could simply do: KStream<String, CallRecord>[] branches = allRecords .branch( (imsi, callRecord) -> "VOICE".equalsIgnoreCase(callR ecord.getCallCommType()), (imsi, callRecord) -> "DATA".equalsIgnoreCase(callRe cord.getCallCommType()) // Any callRecords that aren't matching any of the two predicates above will be dropped. ); This would give you two branched streams instead of three: KStream<String, CallRecord> voiceRecords = branches[0]; KStream<String, CallRecord> dataRecords = branches[1]; // No third branched stream like before. Then, to count "everything" (VOICE + DATA + everything else), simply reuse the original `allRecords` stream. On Fri, Sep 9, 2016 at 2:23 PM, Michael Noll <mich...@confluent.io> wrote: > Ara, > > you have shared this code snippet: > > > allRecords.branch( > > (imsi, callRecord) -> "VOICE".equalsIgnoreCase(callR > ecord.getCallCommType()), > > (imsi, callRecord) -> "DATA".equalsIgnoreCase(callRe > cord.getCallCommType()), > > (imsi, callRecord) -> true > > ); > > The branch() operation partitions the allRecords KStream into three > disjoint streams. > > I'd suggest the following. > > First, update the third predicate in your `branch()` step to be "everything > but VOICE and DATA", i.e. the remainder of allRecords once VOICE and DATA > records are removed: > > > KStream<String, CallRecord>[] branches = allRecords > .branch( > (imsi, callRecord) -> "VOICE".equalsIgnoreCase(callR > ecord.getCallCommType()), > (imsi, callRecord) -> "DATA".equalsIgnoreCase(callRe > cord.getCallCommType()), > (imsi, callRecord) -> !(callRecord.getCallCommType(). > equalsIgnoreCase("VOICE") || callRecord.getCallCommType().e > qualsIgnoreCase("DATA")) > ); > > This would give you: > > KStream<String, CallRecord> voiceRecords = branches[0]; > KStream<String, CallRecord> dataRecords = branches[1]; > KStream<String, CallRecord> recordsThatAreNeitherVoiceNorData = > branches[2]; > > Then, to count "everything" (VOICE + DATA + everything else), simply > reuse the original `allRecords` stream. > > -Michael > > > > > > On Thu, Sep 8, 2016 at 10:20 PM, Ara Ebrahimi <ara.ebrah...@argyledata.com > > wrote: > >> Let’s say I have this: >> >> >> KStream<String, CallRecord>[] branches = allRecords >> .branch( >> (imsi, callRecord) -> "VOICE".equalsIgnoreCase(callR >> ecord.getCallCommType()), >> (imsi, callRecord) -> "DATA".equalsIgnoreCase(callRe >> cord.getCallCommType()), >> (imsi, callRecord) -> true >> ); >> KStream<String, CallRecord> callRecords = branches[0]; >> KStream<String, CallRecord> dataRecords = branches[1]; >> KStream<String, CallRecord> callRecordCounter = branches[2]; >> >> callRecordCounter >> .map((imsi, callRecord) -> new KeyValue<>("", "")) >> .countByKey( >> UnlimitedWindows.of("counter-window"), >> stringSerde >> ) >> .print(); >> >> Here I has 3 branches. Branch 0 is triggered if data is VOICE, branch 1 >> if data is DATA. Branch 2 is supposed to get triggered regardless of type >> all the type so that then I can count stuff for a time window. BUT the >> problem is branch is implemented like this: >> >> private class KStreamBranchProcessor extends AbstractProcessor<K, V> { >> @Override >> public void process(K key, V value) { >> for (int i = 0; i < predicates.length; i++) { >> if (predicates[i].test(key, value)) { >> // use forward with childIndex here and then break the >> loop >> // so that no record is going to be piped to multiple >> streams >> context().forward(key, value, i); >> break; >> } >> } >> } >> } >> >> Note the break. So the counter branch is never reached. I’d like to >> change the behavior of branch so that all predicates are checked and no >> break happens, in say a branchAll() method. What’s the easiest way to this >> functionality to the DSL? I tried process() but it doesn’t return KStream. >> >> Ara. >> >> >> >> >> ________________________________ >> >> This message is for the designated recipient only and may contain >> privileged, proprietary, or otherwise confidential information. If you have >> received it in error, please notify the sender immediately and delete the >> original. Any other use of the e-mail by you is prohibited. Thank you in >> advance for your cooperation. >> >> ________________________________ >> > > >