Hi Team,
I am working on a story where I have to send the records from the aggregation
processor along with uuid, I have a deque topic, and adding the records to the
deque topic and forwarding it from the Main Aggregate class. But when we do
perf testing the TPS is coming around 1200/1300 but in general without this
implementation present in the code the TPS is around 5800/5900.
just want to make sure the implementation is in right way.
I am attaching the code snippets from the two classes here:
Main Class: Aggregate class
The punctuator value I used is 1 sec and 1 min for both the time TPS almost
similar.
if(processedDestination!=null) {
KStream<String, GenericRecord>[] postBranchArray =
getPostBranchStreamsArrayForProcessedTopic(stream);
forwardStreamToProcessedDestination(postBranchArray[1],
processedDestination);
}
private KStream<String, GenericRecord>[]
getPostBranchStreamsArrayForProcessedTopic(KStream<String, GenericRecord>
stream) {
return stream
.process(() -> new Processor<String, GenericRecord, String,
GenericRecord>() {
ProcessorContext context;
public void close() {
KeyValueHeader entry;
while ((entry = processedTopicDeque.poll()) != null) {
Headers headers = new RecordHeaders();
// add the stored headers for processed topic records
to the context
entry.getHeaders().forEach(h -> headers.add(h));
context.forward(new Record(entry.getKey(),
entry.getValue(), context.currentStreamTimeMs(), headers));
}
}
public void init(ProcessorContext context) {
this.context = context;
this.context.schedule(DEFAULT_PUNCTUATION_INTERVAL_PROCESSEDTOPIC,
WALL_CLOCK_TIME, timestamp -> {
KeyValueHeader entry;
while ((entry = processedTopicDeque.poll()) != null) {
Headers headers = new RecordHeaders();
// add the stored headers for processed topic
records to the context
entry.getHeaders().forEach(h -> headers.add(h));
context.forward(new Record(entry.getKey(),
entry.getValue(), timestamp, headers));
}
});
}
public void process(Record<String, GenericRecord> record) {
context.forward(record);
}
})
.branch((k, v) -> Objects.equals(aggregateSchema, v.getSchema()),
(k, v) -> true);
}
protected void forwardStreamToProcessedDestination(KStream<K,V> stream, String
processedDestination) {
createProcessor(stream, false)
.to(processedDestination);
}
The Time window processor class:
Just adding the records to deque topic:
public void process(Record<Windowed<K>, V> record) {
//other code
if(Boolean.TRUE.equals(enableProcessedDestination)){
processedTopicDeque.add(new
KeyValueHeader<>(key.key(),value,headers.get()));
}
//other code
}
Kind Regards,
Vinay Reddy Pannala
Software Developer
[email protected]<mailto:[email protected]>
www.zinkworks.com<http://www.zinkworks.com/>
[cid:7ee3d8c9-47ee-41b5-bf87-887dcab85587]
This email and the information contained herein is proprietary and confidential
and subject to the Amdocs Email Terms of Service, which you may review at
https://www.amdocs.com/about/email-terms-of-service
<https://www.amdocs.com/about/email-terms-of-service>