I changed to below route and it took 20 seconds to produce 500K messages with 
key into Kafka topic. However, I tried to use pure java application to produce 
the same amount of messages with key which only took 10 ~11 seconds. Is there a 
way to make the same performance as pure java in camel? 
I even tried to use the same pure java codes inside of processor. But it took 
22 seconds. Not sure which part went wrong.

Revised Route:

from("vm:msg")
.split(body())
.parallelProcessing()
.process("SetKeyProcessor")
.to("kafka:....")
.end()
.log("${header.Counter}");


--------------------------------------------------------------------------------------------------------
Pure Java Codes:
    int recordCt=0;    long startTime = System.currentTimeMillis();
    Properties properties = new Properties();    
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"localhost:9092");    
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());    
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());    
properties.setProperty(ProducerConfig.ACKS_CONFIG, "1"); // leader replica 
saves    properties.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, 
"false"); // ensure we don't push duplicates    
properties.setProperty(ProducerConfig.RETRIES_CONFIG, "3"); //    
properties.setProperty(ProducerConfig.LINGER_MS_CONFIG, "0"); // was 1 can set 
to 10 millis for more batching    
properties.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, "16340"); // Default 
setting
    Producer<String, String> producer = new KafkaProducer<>(properties);
    String jsonFilePath = "d:\\temp\\test.json"; // 500K messages, file size 
400MB
    try (JsonReader jsonReader = new JsonReader(new FileReader(jsonFilePath)))  
    {        Gson gson = new GsonBuilder().create();        
jsonReader.beginArray(); //start of json array        int numberOfRecords = 0;
        while (jsonReader.hasNext()){ //next json array element
            String message = gson.fromJson(jsonReader, 
JsonObject.class).toString();            recordCt += 1;            
producer.send(new ProducerRecord<>("topic_name", "instanceKey", message));
            if (recordCt % 10000 == 0) {                
System.out.println("Published -> " + String.format("%,d", recordCt));           
 }
         }
        jsonReader.endArray();
    } catch (IOException e) {        e.printStackTrace();    }
    producer.close();
    System.out.println("");    System.out.println("Total records published to 
topic:" + String.format("%,d", recordCt));
    long durationInSecs = (System.currentTimeMillis() - startTime) / 1000;    
System.out.println("");    System.out.println("Total elapsed time to process: " 
+ durationInSecs + " seconds");    System.out.println(""); }


Thank you,

    On Thursday, November 19, 2020, 04:54:30 PM EST, Site Register 
<site.regis...@ymail.com.invalid> wrote:  
 
 Hi Camel Users,

I can set key for an individual message using KafkaConstants.KEY in the header. 
However, the whole process took extremely long time (more than 20 mins) to 
produce 500K messages with key into Kafka topic. I also tried to produce 
messages in batch without assigning the key which took 14 seconds for 500K 
messages. So my question is how to assign the key for the batch messages?
Processor(GsonStreamProcessor):
if (numberOfRecords % batchSize == 0) {

List<String> batchList = new ArrayList<String>();

exchange.getIn().setBody(batchList);
exchange.getIn().setHeader(KafkaConstants.KEY, "key-"+keyCount);     //This 
line not working with batch messages

ProducerTemplate template = exchange.getContext().createProducerTemplate();

template.send("vm:msg", exchange); 
}

Route:
from("direct:start")
.process("GsonStreamProcessor")
.log("Done");
 from("vm:msg")
.to("kafka:.....");
 

Thank you,


  

Reply via email to