Hi Team, I am working on a story where I have to send the records from the aggregation processor along with uuid, I have a deque topic, and adding the records to the deque topic and forwarding it from the Main Aggregate class. But when we do perf testing the TPS is coming around 1200/1300 but in general without this implementation present in the code the TPS is around 5800/5900.
just want to make sure the implementation is in right way. I am attaching the code snippets from the two classes here: Main Class: Aggregate class The punctuator value I used is 1 sec and 1 min for both the time TPS almost similar. if(processedDestination!=null) { KStream<String, GenericRecord>[] postBranchArray = getPostBranchStreamsArrayForProcessedTopic(stream); forwardStreamToProcessedDestination(postBranchArray[1], processedDestination); } private KStream<String, GenericRecord>[] getPostBranchStreamsArrayForProcessedTopic(KStream<String, GenericRecord> stream) { return stream .process(() -> new Processor<String, GenericRecord, String, GenericRecord>() { ProcessorContext context; public void close() { KeyValueHeader entry; while ((entry = processedTopicDeque.poll()) != null) { Headers headers = new RecordHeaders(); // add the stored headers for processed topic records to the context entry.getHeaders().forEach(h -> headers.add(h)); context.forward(new Record(entry.getKey(), entry.getValue(), context.currentStreamTimeMs(), headers)); } } public void init(ProcessorContext context) { this.context = context; this.context.schedule(DEFAULT_PUNCTUATION_INTERVAL_PROCESSEDTOPIC, WALL_CLOCK_TIME, timestamp -> { KeyValueHeader entry; while ((entry = processedTopicDeque.poll()) != null) { Headers headers = new RecordHeaders(); // add the stored headers for processed topic records to the context entry.getHeaders().forEach(h -> headers.add(h)); context.forward(new Record(entry.getKey(), entry.getValue(), timestamp, headers)); } }); } public void process(Record<String, GenericRecord> record) { context.forward(record); } }) .branch((k, v) -> Objects.equals(aggregateSchema, v.getSchema()), (k, v) -> true); } protected void forwardStreamToProcessedDestination(KStream<K,V> stream, String processedDestination) { createProcessor(stream, false) .to(processedDestination); } The Time window processor class: Just adding the records to deque topic: public void process(Record<Windowed<K>, V> record) { //other code if(Boolean.TRUE.equals(enableProcessedDestination)){ processedTopicDeque.add(new KeyValueHeader<>(key.key(),value,headers.get())); } //other code } Kind Regards, Vinay Reddy Pannala Software Developer vinay...@amdocs.com<mailto:carlos.ry...@amdocs.com> www.zinkworks.com<http://www.zinkworks.com/> [cid:f87d3560-fabe-4c8a-85df-dc9ba2830d68] This email and the information contained herein is proprietary and confidential and subject to the Amdocs Email Terms of Service, which you may review at https://www.amdocs.com/about/email-terms-of-service <https://www.amdocs.com/about/email-terms-of-service>