Implementation via Kafka Connector
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_${scala.version}</artifactId>
<version>3.1.2</version>
</dependency>
private static void publishViaKafkaConnector(Dataset<Row> dataset,
Map<String, String> conf) {
dataset.write().format("kafka").options(conf).save();
}
conf = {
"kafka.bootstrap.servers": "localhost:9092",
"kafka.acks": "all",
"topic": "test-topic",
"kafka.linger.ms": "10000",
"kafka.batch.size": "10000000"
}
"kafka.linger.ms" and "kafka.batch.size" are successfully set in
Producer Config (verified via startup logs) but they are not being
honored.
========================================================================================================================
Implementation via Kafka Client
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.6.3</version>
</dependency>
private static void publishViaKafkaClient(Dataset<Row> dataset,
Map<String, String> conf) {
dataset.foreachPartition(
(ForeachPartitionFunction<Row>)
rows -> {
Properties props = new Properties();
props.putAll(conf);
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
int partitionCount = producer.partitionsFor("test-topic").size();
System.out.println("Partition count: " + partitionCount);
Map<Integer, Future<RecordMetadata>> lastRecordMetadata =
new HashMap<>();
int counter = 0;
ObjectMapper mapper = new ObjectMapper();
while (rows.hasNext()) {
Row row = rows.next();
int partitionId = counter % partitionCount;
String value = mapper.writeValueAsString(row.get(0));
Future<RecordMetadata> future =
producer.send(new ProducerRecord<>("test-topic",
partitionId, null, value));
lastRecordMetadata.put(partitionId, future);
System.out.println("Sent message: " + null + " -> " + value);
counter++;
}
lastRecordMetadata.forEach(
(key, value) -> {
try {
RecordMetadata metadata = value.get();
System.out.println("Ack Received: " + metadata.toString());
} catch (Exception e) {
e.printStackTrace();
}
});
System.out.println("All messages acknowledged by Kafka Server");
producer.close();
});
}
conf = {
"bootstrap.servers": "localhost:9092",
"acks": "all",
"linger.ms": "1000",
"batch.size": "100000",
"key.serializer":
"org.apache.kafka.common.serialization.ByteArraySerializer",
"value.serializer":
"org.apache.kafka.common.serialization.ByteArraySerializer"
}
"linger.ms" and "batch.size" are successfully set in Producer Config
(verified via startup logs) and are being honored.
========================================================================================================================
Now, the question is why "kafka.linger.ms" and "kafka.batch.size" are
not being honored by kafka connector?
Regards,
Abhishek Singla
On Wed, Apr 16, 2025 at 7:19 PM daniel williams <[email protected]>
wrote:
> If you are building a broadcast to construct a producer with a set of
> options then the producer is merely going to operate how it’s going to be
> configured - it has nothing to do with spark save that the foreachPartition
> is constructing it via the broadcast.
>
> A strategy I’ve used in the past is to
> * increase memory pool for asynchronous processing
> * make multiple broadcast producers and randomly access the producer to
> balance the asynchronous sending across more thread pools
> * implement back pressure via an adapter class to capture errors
>
> These are the same things you would want to consider while writing a high
> volume Kafka based application
>
>
>
> -dan
>
>
> On Wed, Apr 16, 2025 at 7:17 AM Abhishek Singla <
> [email protected]> wrote:
>
>> Yes, producing via kafka-clients using foreachPartition works as
>> expected. Kafka Producer is initialised within call(Iterator<T> t)
>> method.
>>
>> The issue is with using kafka connector with Spark batch. The configs are
>> not being honored even when they are being set in ProducerConfig. This
>> means kafka records production rate cannot be controlled via kafka
>> connector in Spark batch. This can lead to lag in in-sync replicas if they
>> are not able to catch up and eventually kafka server failing writes it
>> in-sync replicas count reduced the required in-sync replicas. Is there any
>> way to solve this using kafka connector?
>>
>> Regards,
>> Abhishek Singla
>>
>