This is an automated email from the ASF dual-hosted git repository.
wombatu-kun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new b5879f95ad14 perf(kafka-connect): reuse AvroConvertor across records
in the connect writer (#19015)
b5879f95ad14 is described below
commit b5879f95ad1447c41bd75ee284ba509268eaf1f1
Author: Vova Kolmakov <[email protected]>
AuthorDate: Tue Jun 16 17:12:36 2026 +0700
perf(kafka-connect): reuse AvroConvertor across records in the connect
writer (#19015)
---
.../apache/hudi/connect/writers/AbstractConnectWriter.java | 13 ++++++++++---
1 file changed, 10 insertions(+), 3 deletions(-)
diff --git
a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/AbstractConnectWriter.java
b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/AbstractConnectWriter.java
index 70fefe431603..149ace0403cd 100644
---
a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/AbstractConnectWriter.java
+++
b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/AbstractConnectWriter.java
@@ -53,6 +53,10 @@ public abstract class AbstractConnectWriter implements
ConnectWriter<WriteStatus
private final KeyGenerator keyGenerator;
private final SchemaProvider schemaProvider;
protected final KafkaConnectConfigs connectConfigs;
+ private final String kafkaValueConverter;
+ // Reused across all records of this writer (one writer is created per
commit, single-threaded),
+ // instead of re-parsing the schema and rebuilding the converter on every
record.
+ private AvroConvertor convertor;
public AbstractConnectWriter(KafkaConnectConfigs connectConfigs,
KeyGenerator keyGenerator,
@@ -61,23 +65,26 @@ public abstract class AbstractConnectWriter implements
ConnectWriter<WriteStatus
this.keyGenerator = keyGenerator;
this.schemaProvider = schemaProvider;
this.instantTime = instantTime;
+ this.kafkaValueConverter = connectConfigs.getKafkaValueConverter();
}
@Override
public void writeRecord(SinkRecord record) throws IOException {
- AvroConvertor convertor = new
AvroConvertor(schemaProvider.getSourceHoodieSchema());
Option<GenericRecord> avroRecord;
- switch (connectConfigs.getKafkaValueConverter()) {
+ switch (kafkaValueConverter) {
case KAFKA_AVRO_CONVERTER:
avroRecord = Option.of((GenericRecord) record.value());
break;
case KAFKA_STRING_CONVERTER:
+ if (convertor == null) {
+ convertor = new
AvroConvertor(schemaProvider.getSourceHoodieSchema());
+ }
avroRecord = Option.of(convertor.fromJson((String) record.value()));
break;
case KAFKA_JSON_CONVERTER:
throw new UnsupportedEncodingException("Currently JSON objects are not
supported");
default:
- throw new IOException("Unsupported Kafka Format type (" +
connectConfigs.getKafkaValueConverter() + ")");
+ throw new IOException("Unsupported Kafka Format type (" +
kafkaValueConverter + ")");
}
// Tag records with a file ID based on kafka partition and hudi partition.