Oh, my bad.

Updating the third predicate in `branch()` may not even be needed.

You could simply do:

KStream<String, CallRecord>[] branches = allRecords
        .branch(
            (imsi, callRecord) -> "VOICE".equalsIgnoreCase(callR
ecord.getCallCommType()),
            (imsi, callRecord) -> "DATA".equalsIgnoreCase(callRe
cord.getCallCommType())
            // Any callRecords that aren't matching any of the two
predicates above will be dropped.
        );

This would give you two branched streams instead of three:

    KStream<String, CallRecord> voiceRecords = branches[0];
    KStream<String, CallRecord> dataRecords = branches[1];
    // No third branched stream like before.

Then, to count "everything" (VOICE + DATA + everything else), simply reuse
the original `allRecords` stream.



On Fri, Sep 9, 2016 at 2:23 PM, Michael Noll <mich...@confluent.io> wrote:

> Ara,
>
> you have shared this code snippet:
>
> >    allRecords.branch(
> >            (imsi, callRecord) -> "VOICE".equalsIgnoreCase(callR
> ecord.getCallCommType()),
> >            (imsi, callRecord) -> "DATA".equalsIgnoreCase(callRe
> cord.getCallCommType()),
> >            (imsi, callRecord) -> true
> >    );
>
> The branch() operation partitions the allRecords KStream into three
> disjoint streams.
>
> I'd suggest the following.
>
> First, update the third predicate in your `branch()` step to be "everything
> but VOICE and DATA", i.e. the remainder of allRecords once VOICE and DATA
> records are removed:
>
>
>     KStream<String, CallRecord>[] branches = allRecords
>         .branch(
>             (imsi, callRecord) -> "VOICE".equalsIgnoreCase(callR
> ecord.getCallCommType()),
>             (imsi, callRecord) -> "DATA".equalsIgnoreCase(callRe
> cord.getCallCommType()),
>             (imsi, callRecord) -> !(callRecord.getCallCommType().
> equalsIgnoreCase("VOICE") || callRecord.getCallCommType().e
> qualsIgnoreCase("DATA"))
>         );
>
> This would give you:
>
>     KStream<String, CallRecord> voiceRecords = branches[0];
>     KStream<String, CallRecord> dataRecords = branches[1];
>     KStream<String, CallRecord> recordsThatAreNeitherVoiceNorData =
> branches[2];
>
> Then, to count "everything" (VOICE + DATA + everything else), simply
> reuse the original `allRecords` stream.
>
> -Michael
>
>
>
>
>
> On Thu, Sep 8, 2016 at 10:20 PM, Ara Ebrahimi <ara.ebrah...@argyledata.com
> > wrote:
>
>> Let’s say I have this:
>>
>>
>> KStream<String, CallRecord>[] branches = allRecords
>>     .branch(
>>             (imsi, callRecord) -> "VOICE".equalsIgnoreCase(callR
>> ecord.getCallCommType()),
>>             (imsi, callRecord) -> "DATA".equalsIgnoreCase(callRe
>> cord.getCallCommType()),
>>             (imsi, callRecord) -> true
>>     );
>> KStream<String, CallRecord> callRecords = branches[0];
>> KStream<String, CallRecord> dataRecords = branches[1];
>> KStream<String, CallRecord> callRecordCounter = branches[2];
>>
>> callRecordCounter
>>         .map((imsi, callRecord) -> new KeyValue<>("", ""))
>>         .countByKey(
>>                 UnlimitedWindows.of("counter-window"),
>>                 stringSerde
>>         )
>>         .print();
>>
>> Here I has 3 branches. Branch 0 is triggered if data is VOICE, branch 1
>> if data is DATA. Branch 2 is supposed to get triggered regardless of type
>> all the type so that then I can count stuff for a time window. BUT the
>> problem is branch is implemented like this:
>>
>> private class KStreamBranchProcessor extends AbstractProcessor<K, V> {
>>     @Override
>>     public void process(K key, V value) {
>>         for (int i = 0; i < predicates.length; i++) {
>>             if (predicates[i].test(key, value)) {
>>                 // use forward with childIndex here and then break the
>> loop
>>                 // so that no record is going to be piped to multiple
>> streams
>>                 context().forward(key, value, i);
>>                 break;
>>             }
>>         }
>>     }
>> }
>>
>> Note the break. So the counter branch is never reached. I’d like to
>> change the behavior of branch so that all predicates are checked and no
>> break happens, in say a branchAll() method. What’s the easiest way to this
>> functionality to the DSL? I tried process() but it doesn’t return KStream.
>>
>> Ara.
>>
>>
>>
>>
>> ________________________________
>>
>> This message is for the designated recipient only and may contain
>> privileged, proprietary, or otherwise confidential information. If you have
>> received it in error, please notify the sender immediately and delete the
>> original. Any other use of the e-mail by you is prohibited. Thank you in
>> advance for your cooperation.
>>
>> ________________________________
>>
>
>
>

Reply via email to