Andrei Iatsuk created KAFKA-12481:
-------------------------------------

             Summary: Add socket.nagle.disable config to reduce number of 
packets 
                 Key: KAFKA-12481
                 URL: https://issues.apache.org/jira/browse/KAFKA-12481
             Project: Kafka
          Issue Type: Improvement
          Components: core
    Affects Versions: 2.6.1, 2.7.0, 2.5.1, 2.4.1, 2.3.1, 2.2.2, 2.1.1, 2.0.1, 
1.1.1, 1.0.2, 0.11.0.3, 0.10.2.2, 0.8.2.2
            Reporter: Andrei Iatsuk
         Attachments: Screenshot 2021-03-13 at 00.29.10.png, Screenshot 
2021-03-13 at 00.44.43.png, Screenshot 2021-03-14 at 01.46.00.png, Screenshot 
2021-03-14 at 01.52.01.png, Screenshot 2021-03-16 at 21.05.03.png, Screenshot 
2021-03-16 at 21.12.17.png

*What to do?*

Add _socket.nagle.disable_ parameter to Apache Kafka config like in 
[librdkafka|https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md].

*What reason of this improvement?*

A large number of topic-partitions on one broker causes burst of host's 
packets/sec metric. The traffic shaper in the cloud ceases to cope with such a 
load and causes service degradation.

*How to reproduce?*
 # Create Kafka Cluster with 4 brokers. Amount of packet/sec is ~120.
 # Add 100 topics with 100 partitions each and replication factor = 3. It is 
30k topic-partitions in total. Amount of packet/sec is ~15k.
{code:java}
import os
for i in range(100):
  print(f"create topic 'flower{i}'... ", end="")
  cmd = "kafka-topics.sh --create --bootstrap-server {} --topic {} --partitions 
{} --replication-factor {}".format("databus.andrei-iatsuk.ec.odkl.ru:9092", 
f"flower{i}", 100, 3)
  code = os.system(cmd)
  print("ok" if code == 0 else "error")
{code}
!Screenshot 2021-03-16 at 21.05.03.png!

 # Generate server load by launching next script in 4 terminals. Amount of 
packet/sec is ~130k.
{code:java}
import time
from pykafka import KafkaClient
client = KafkaClient(hosts="databus.andrei-iatsuk.ec.odkl.ru:9092")
while True:
  for i in range(100):
    print(f"sent message to 'flower{i}'")
    with client.topics[f"flower{i}"].get_sync_producer() as producer:
      for j in range(1000):
        producer.produce(str.encode(f'test message {j} in topic flower{i}' * 
10))
{code}
!Screenshot 2021-03-13 at 00.44.43.png! 
 !Screenshot 2021-03-13 at 00.29.10.png!

 # Make dump of tcp connections via tcpdump due ~2 sec:
{code:java}
$ tcpdump -i eth1 -w dump.pcap
tcpdump: listening on eth1, link-type EN10MB (Ethernet), capture size 262144 
bytes
^C8873886 packets captured
9139050 packets received by filter
265028 packets dropped by kernel
{code}

 # Load dump to Wireshark and see statistics: ~99.999% of packets is inter 
broker messages, size of packets 40-160 bytes. On screen hosts with IPs 
10.16.23.[157-160] is brokers:
 !Screenshot 2021-03-14 at 01.46.00.png! 
 !Screenshot 2021-03-14 at 01.52.01.png!

*How to fix?*
 # Add boolean _socket.nagle.disable_ parameter to Apache Kafka config and 
provide value to kafka.network.Acceptor.accept(key) method in : 
[https://github.com/apache/kafka/blob/2.4/core/src/main/scala/kafka/network/SocketServer.scala#L646]
 # For disabled TCP_NODELAY value:
 ## ~400 packets/s for idle broker (instead ~12k packets/s)
 ## ~3k packets/s for loaded broker (instead ~150k packets/s)
 !Screenshot 2021-03-16 at 21.12.17.png!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to