I changed to below route and it took 20 seconds to produce 500K messages with key into Kafka topic. However, I tried to use pure java application to produce the same amount of messages with key which only took 10 ~11 seconds. Is there a way to make the same performance as pure java in camel? I even tried to use the same pure java codes inside of processor. But it took 22 seconds. Not sure which part went wrong.
Revised Route: from("vm:msg") .split(body()) .parallelProcessing() .process("SetKeyProcessor") .to("kafka:....") .end() .log("${header.Counter}"); -------------------------------------------------------------------------------------------------------- Pure Java Codes: int recordCt=0; long startTime = System.currentTimeMillis(); Properties properties = new Properties(); properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.setProperty(ProducerConfig.ACKS_CONFIG, "1"); // leader replica saves properties.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "false"); // ensure we don't push duplicates properties.setProperty(ProducerConfig.RETRIES_CONFIG, "3"); // properties.setProperty(ProducerConfig.LINGER_MS_CONFIG, "0"); // was 1 can set to 10 millis for more batching properties.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, "16340"); // Default setting Producer<String, String> producer = new KafkaProducer<>(properties); String jsonFilePath = "d:\\temp\\test.json"; // 500K messages, file size 400MB try (JsonReader jsonReader = new JsonReader(new FileReader(jsonFilePath))) { Gson gson = new GsonBuilder().create(); jsonReader.beginArray(); //start of json array int numberOfRecords = 0; while (jsonReader.hasNext()){ //next json array element String message = gson.fromJson(jsonReader, JsonObject.class).toString(); recordCt += 1; producer.send(new ProducerRecord<>("topic_name", "instanceKey", message)); if (recordCt % 10000 == 0) { System.out.println("Published -> " + String.format("%,d", recordCt)); } } jsonReader.endArray(); } catch (IOException e) { e.printStackTrace(); } producer.close(); System.out.println(""); System.out.println("Total records published to topic:" + String.format("%,d", recordCt)); long durationInSecs = (System.currentTimeMillis() - startTime) / 1000; System.out.println(""); System.out.println("Total elapsed time to process: " + durationInSecs + " seconds"); System.out.println(""); } Thank you, On Thursday, November 19, 2020, 04:54:30 PM EST, Site Register <site.regis...@ymail.com.invalid> wrote: Hi Camel Users, I can set key for an individual message using KafkaConstants.KEY in the header. However, the whole process took extremely long time (more than 20 mins) to produce 500K messages with key into Kafka topic. I also tried to produce messages in batch without assigning the key which took 14 seconds for 500K messages. So my question is how to assign the key for the batch messages? Processor(GsonStreamProcessor): if (numberOfRecords % batchSize == 0) { List<String> batchList = new ArrayList<String>(); exchange.getIn().setBody(batchList); exchange.getIn().setHeader(KafkaConstants.KEY, "key-"+keyCount); //This line not working with batch messages ProducerTemplate template = exchange.getContext().createProducerTemplate(); template.send("vm:msg", exchange); } Route: from("direct:start") .process("GsonStreamProcessor") .log("Done"); from("vm:msg") .to("kafka:....."); Thank you,