Ara, you have shared this code snippet:
> allRecords.branch( > (imsi, callRecord) -> "VOICE".equalsIgnoreCase( callRecord.getCallCommType()), > (imsi, callRecord) -> "DATA".equalsIgnoreCase( callRecord.getCallCommType()), > (imsi, callRecord) -> true > ); The branch() operation partitions the allRecords KStream into three disjoint streams. I'd suggest the following. First, update the third predicate in your `branch()` step to be "everything but VOICE and DATA", i.e. the remainder of allRecords once VOICE and DATA records are removed: KStream<String, CallRecord>[] branches = allRecords .branch( (imsi, callRecord) -> "VOICE".equalsIgnoreCase( callRecord.getCallCommType()), (imsi, callRecord) -> "DATA".equalsIgnoreCase( callRecord.getCallCommType()), (imsi, callRecord) -> !(callRecord.getCallCommType(). equalsIgnoreCase("VOICE") || callRecord.getCallCommType(). equalsIgnoreCase("DATA")) ); This would give you: KStream<String, CallRecord> voiceRecords = branches[0]; KStream<String, CallRecord> dataRecords = branches[1]; KStream<String, CallRecord> recordsThatAreNeitherVoiceNorData = branches[2]; Then, to count "everything" (VOICE + DATA + everything else), simply reuse the original `allRecords` stream. -Michael On Thu, Sep 8, 2016 at 10:20 PM, Ara Ebrahimi <ara.ebrah...@argyledata.com> wrote: > Let’s say I have this: > > > KStream<String, CallRecord>[] branches = allRecords > .branch( > (imsi, callRecord) -> "VOICE".equalsIgnoreCase( > callRecord.getCallCommType()), > (imsi, callRecord) -> "DATA".equalsIgnoreCase( > callRecord.getCallCommType()), > (imsi, callRecord) -> true > ); > KStream<String, CallRecord> callRecords = branches[0]; > KStream<String, CallRecord> dataRecords = branches[1]; > KStream<String, CallRecord> callRecordCounter = branches[2]; > > callRecordCounter > .map((imsi, callRecord) -> new KeyValue<>("", "")) > .countByKey( > UnlimitedWindows.of("counter-window"), > stringSerde > ) > .print(); > > Here I has 3 branches. Branch 0 is triggered if data is VOICE, branch 1 if > data is DATA. Branch 2 is supposed to get triggered regardless of type all > the type so that then I can count stuff for a time window. BUT the problem > is branch is implemented like this: > > private class KStreamBranchProcessor extends AbstractProcessor<K, V> { > @Override > public void process(K key, V value) { > for (int i = 0; i < predicates.length; i++) { > if (predicates[i].test(key, value)) { > // use forward with childIndex here and then break the loop > // so that no record is going to be piped to multiple > streams > context().forward(key, value, i); > break; > } > } > } > } > > Note the break. So the counter branch is never reached. I’d like to change > the behavior of branch so that all predicates are checked and no break > happens, in say a branchAll() method. What’s the easiest way to this > functionality to the DSL? I tried process() but it doesn’t return KStream. > > Ara. > > > > > ________________________________ > > This message is for the designated recipient only and may contain > privileged, proprietary, or otherwise confidential information. If you have > received it in error, please notify the sender immediately and delete the > original. Any other use of the e-mail by you is prohibited. Thank you in > advance for your cooperation. > > ________________________________ >