Michal Turek created KAFKA-6558:
-----------------------------------

             Summary: Pre-load topics metadata in Mirror maker
                 Key: KAFKA-6558
                 URL: https://issues.apache.org/jira/browse/KAFKA-6558
             Project: Kafka
          Issue Type: Improvement
          Components: tools
            Reporter: Michal Turek


Mirror maker starts consumption before topics metadata are loaded from 
destination Kafka, internally buffers messages in producer which may result in 
high GC, application slowdown, timeouts and {{OutOfMemoryError}}. We were 
forced to increase JVM heap from 1 GB to 4 GB today to survive the first few 
seconds during startup, Mirror maker typically needs about 400 MB long term.

The probable issue is that the producer loads topics metadata lazily as part of 
{{producer.send()}}, the buffering happens in the sending phase.

A lot of messages like below appeared in the logs.

{noformat}
2018-02-13 10:30:42,524 INFO                  kafka.tools.MirrorMaker$ 
[twork-thread | producer-1]: Closing producer due to send failure.
2018-02-13 10:30:42,524 INFO  che.kafka.clients.producer.KafkaProducer 
[twork-thread | producer-1]: Closing the Kafka producer with timeoutMillis = 0 
ms.
2018-02-13 10:30:42,524 INFO  che.kafka.clients.producer.KafkaProducer 
[twork-thread | producer-1]: Proceeding to force close the producer since 
pending requests could not be completed within timeout 0 ms.
2018-02-13 10:30:42,524 ERROR .producer.internals.ErrorLoggingCallback 
[twork-thread | producer-1]: Error when sending message to topic AA_MASTER_018 
with key: 4 bytes, value: 109 bytes with error:
java.lang.IllegalStateException: Producer is closed forcefully.
        at 
org.apache.kafka.clients.producer.internals.RecordAccumulator.abortBatches(RecordAccumulator.java:522)
        at 
org.apache.kafka.clients.producer.internals.RecordAccumulator.abortIncompleteBatches(RecordAccumulator.java:502)
        at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:147)
        at java.lang.Thread.run(Thread.java:748)
{noformat}

h4. Proposed change

- Build Kafka producer client.
- *Warm it up by loading topics metadata from destination Kafka.* We use this 
approach quite successfully in our custom high-throughput application that is 
based on Kafka producer client. The metadata loading takes about 4 - 5 seconds.
- Build Kafka consumer clients and start consumption.

{noformat}
// Warm up producer
Set<String> topicNames = consumer.listTopics().keySet();
topicNames.forEach(topic -> producer.partitionsFor(topic));
{noformat}

h4. Our context

- Kafka 0.10.2.1
- About 1500 messages/s is typically consumed per Mirror maker process.
- Mirror maker was stopped for about 1 hour, all the messages were buffering in 
the source Kafka.
- Mirror maker was unable to start until we increased its JVM heap.

{noformat}
bootstrap.servers=...
group.id=...
security.protocol=SSL
ssl.truststore.location=.../truststore.jks
ssl.truststore.password=...
key.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer
value.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer
auto.offset.reset=latest
enable.auto.commit=false
partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor
fetch.min.bytes=40960
session.timeout.ms=60000
{noformat}

{noformat}
bootstrap.servers=...
key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
acks=1
retries=1
linger.ms=20
buffer.memory=134217728
batch.size=65536
compression.type=lz4
{noformat}

{noformat}
kafka.tools.MirrorMaker --new.consumer --num.streams 25 --consumer.config ... 
--producer.config ... --offset.commit.interval.ms=60000 --whitelist=...
{noformat}




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to