Hi Team,

I am working on a story where I have to send the records from the aggregation 
processor along with uuid, I have a deque topic, and adding the records to the 
deque topic and forwarding it from the Main Aggregate class. But when we do 
perf testing the TPS is coming around 1200/1300 but in general without this 
implementation present in the code the TPS is around 5800/5900.

just want to make sure the implementation is in right way.


I am attaching the code snippets from the two classes here:

Main Class: Aggregate class

The punctuator value I used is 1 sec and 1 min for both the time TPS almost 
similar.


if(processedDestination!=null) {
    KStream<String, GenericRecord>[] postBranchArray = 
getPostBranchStreamsArrayForProcessedTopic(stream);
    forwardStreamToProcessedDestination(postBranchArray[1], 
processedDestination);
}

private KStream<String, GenericRecord>[] 
getPostBranchStreamsArrayForProcessedTopic(KStream<String, GenericRecord> 
stream) {
    return stream
            .process(() -> new Processor<String, GenericRecord, String, 
GenericRecord>() {
                ProcessorContext context;

                public void close() {
                    KeyValueHeader entry;
                    while ((entry = processedTopicDeque.poll()) != null) {
                        Headers headers = new RecordHeaders();

                        // add the stored headers for processed topic records 
to the context
                        entry.getHeaders().forEach(h -> headers.add(h));
                        context.forward(new Record(entry.getKey(), 
entry.getValue(), context.currentStreamTimeMs(), headers));
                    }
                }

                public void init(ProcessorContext context) {
                    this.context = context;
                    
this.context.schedule(DEFAULT_PUNCTUATION_INTERVAL_PROCESSEDTOPIC, 
WALL_CLOCK_TIME, timestamp -> {
                        KeyValueHeader entry;
                        while ((entry = processedTopicDeque.poll()) != null) {
                            Headers headers = new RecordHeaders();

                            // add the stored headers for processed topic 
records to the context
                            entry.getHeaders().forEach(h -> headers.add(h));
                            context.forward(new Record(entry.getKey(), 
entry.getValue(), timestamp, headers));
                        }
                    });
                }

                public void process(Record<String, GenericRecord> record) {
                    context.forward(record);
                }
            })
            .branch((k, v) -> Objects.equals(aggregateSchema, v.getSchema()), 
(k, v) -> true);
}


protected void forwardStreamToProcessedDestination(KStream<K,V> stream, String 
processedDestination) {
    createProcessor(stream, false)
            .to(processedDestination);
}


The Time window processor class:

Just adding the records to deque topic:

public void process(Record<Windowed<K>, V> record) {

//other code

if(Boolean.TRUE.equals(enableProcessedDestination)){
    processedTopicDeque.add(new 
KeyValueHeader<>(key.key(),value,headers.get()));
}
//other code
}




Kind Regards,

Vinay Reddy Pannala

Software Developer

vinay...@amdocs.com<mailto:carlos.ry...@amdocs.com>

www.zinkworks.com<http://www.zinkworks.com/>

[cid:f87d3560-fabe-4c8a-85df-dc9ba2830d68]
This email and the information contained herein is proprietary and confidential 
and subject to the Amdocs Email Terms of Service, which you may review at 
https://www.amdocs.com/about/email-terms-of-service 
<https://www.amdocs.com/about/email-terms-of-service>

Reply via email to