Michal Turek created KAFKA-6558:
-----------------------------------
Summary: Pre-load topics metadata in Mirror maker
Key: KAFKA-6558
URL: https://issues.apache.org/jira/browse/KAFKA-6558
Project: Kafka
Issue Type: Improvement
Components: tools
Reporter: Michal Turek
Mirror maker starts consumption before topics metadata are loaded from
destination Kafka, internally buffers messages in producer which may result in
high GC, application slowdown, timeouts and {{OutOfMemoryError}}. We were
forced to increase JVM heap from 1 GB to 4 GB today to survive the first few
seconds during startup, Mirror maker typically needs about 400 MB long term.
The probable issue is that the producer loads topics metadata lazily as part of
{{producer.send()}}, the buffering happens in the sending phase.
A lot of messages like below appeared in the logs.
{noformat}
2018-02-13 10:30:42,524 INFO kafka.tools.MirrorMaker$
[twork-thread | producer-1]: Closing producer due to send failure.
2018-02-13 10:30:42,524 INFO che.kafka.clients.producer.KafkaProducer
[twork-thread | producer-1]: Closing the Kafka producer with timeoutMillis = 0
ms.
2018-02-13 10:30:42,524 INFO che.kafka.clients.producer.KafkaProducer
[twork-thread | producer-1]: Proceeding to force close the producer since
pending requests could not be completed within timeout 0 ms.
2018-02-13 10:30:42,524 ERROR .producer.internals.ErrorLoggingCallback
[twork-thread | producer-1]: Error when sending message to topic AA_MASTER_018
with key: 4 bytes, value: 109 bytes with error:
java.lang.IllegalStateException: Producer is closed forcefully.
at
org.apache.kafka.clients.producer.internals.RecordAccumulator.abortBatches(RecordAccumulator.java:522)
at
org.apache.kafka.clients.producer.internals.RecordAccumulator.abortIncompleteBatches(RecordAccumulator.java:502)
at
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:147)
at java.lang.Thread.run(Thread.java:748)
{noformat}
h4. Proposed change
- Build Kafka producer client.
- *Warm it up by loading topics metadata from destination Kafka.* We use this
approach quite successfully in our custom high-throughput application that is
based on Kafka producer client. The metadata loading takes about 4 - 5 seconds.
- Build Kafka consumer clients and start consumption.
{noformat}
// Warm up producer
Set<String> topicNames = consumer.listTopics().keySet();
topicNames.forEach(topic -> producer.partitionsFor(topic));
{noformat}
h4. Our context
- Kafka 0.10.2.1
- About 1500 messages/s is typically consumed per Mirror maker process.
- Mirror maker was stopped for about 1 hour, all the messages were buffering in
the source Kafka.
- Mirror maker was unable to start until we increased its JVM heap.
{noformat}
bootstrap.servers=...
group.id=...
security.protocol=SSL
ssl.truststore.location=.../truststore.jks
ssl.truststore.password=...
key.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer
value.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer
auto.offset.reset=latest
enable.auto.commit=false
partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor
fetch.min.bytes=40960
session.timeout.ms=60000
{noformat}
{noformat}
bootstrap.servers=...
key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
acks=1
retries=1
linger.ms=20
buffer.memory=134217728
batch.size=65536
compression.type=lz4
{noformat}
{noformat}
kafka.tools.MirrorMaker --new.consumer --num.streams 25 --consumer.config ...
--producer.config ... --offset.commit.interval.ms=60000 --whitelist=...
{noformat}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)