Hi, I have created a new operator which converts avro message to json message by extending base operator. I'm unable to use AvroToPojoOperator due to the complexity of avro structure. I'm trying to analyze its performance.
When I use: VCORES MEMORY Tuples Emitted MA CPU% 1 2048 3100 98% 1 4096 3200 98% 2 2048 3100 98% 2 4096 3200 98% *All the above Emitted, CPU% are approximates based on simple tests. >From the above analysis, increase in number of cores or memory didn't affect CPU or tuples emitted. My assumption was increasing VCORES will actually reduce the CPU or may be increase emitted tuples. One of my assumption is that, the operator is not actually using 2GB of memory as its a CPU intense operation, so I get that increase of memory really doesn't affect tuples emitted. But, increase in VCORES should reduce the CPU% and increase tuples emitted as now more cores are available for processing. Is my above assumption wrong? If so, what can be done to reduce CPU% a part from partitioning the operator? Below is my code for custom operator. public class CustomAvroToStringOperator extends BaseOperator { @NotNull private String schemaRegistryUrl; private transient KafkaAvroDeserializer avroDeserializer; public String getSchemaRegistryUrl() { return schemaRegistryUrl; } public void setSchemaRegistryUrl(String schemaRegistryUrl) { this.schemaRegistryUrl = schemaRegistryUrl; } public final transient DefaultOutputPort<String> outputPort = new DefaultOutputPort<>(); public final transient DefaultInputPort<byte[]> inputPort= new DefaultInputPort<byte[]>() { @Override public void process(byte[] tuple) { outputPort.emit(avroDeserializer.deserialize("topic", tuple).toString()); } }; @Override public void setup(Context.OperatorContext context) { avroDeserializer = setUpSchemaRegistry(); super.setup(context); } private KafkaAvroDeserializer setUpSchemaRegistry() { final Map<String, String> config = new HashMap<>(); config.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl); config.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "false"); return new KafkaAvroDeserializer(new CachedSchemaRegistryClient(schemaRegistryUrl, 1000), config); } } Regards Vivek -- Sent from: http://apache-apex-users-list.78494.x6.nabble.com/