A few days ago, I’ve started migrating for the purpose of a benchmark onto 
stream processing engines an entire collection from MongoDB to a Kafka log. 

In summary, the MongoDB collection contains approximately 560 million documents 
of mean size 2529 bytes that I am in time of writing this still migrating 
to Kafka - 3 days passed and counting. 

The configuration of the instance I run the migration is as follows: 16 x 2.6 
GHz CPU, 64GB of RAM, 4.6TB of hard drive. 

The Python script I’ve wrote is as follows: 

class Arguments:
  def __init__(self, arguments):
    self.arguments = arguments
  @property
  def mongodb_host(self):
    return self.arguments['mongodb.host']
  @property
  def mongodb_database(self):
    return self.arguments['mongodb.database']
  @property
  def mongodb_collection(self):
    return self.arguments['mongodb.collection']
  @property
  def kafka_bootstrap_servers(self):
    return self.arguments['kafka.bootstrap.servers']
  @property
  def kafka_topic(self):
    return self.arguments['kafka.topic']

argument_parser = argparse.ArgumentParser(description='MongoDB collection to 
Apache Kafka topic migration script')

argument_parser.add_argument('-m', '--mongodb.host', 
default='mongodb://localhost:27017', help='MongoDB hostname:port')
argument_parser.add_argument('-d', '--mongodb.database', required=True, 
help="MongoDB database")
argument_parser.add_argument('-c', '--mongodb.collection', required=True, 
help='MongoDB source collection')
argument_parser.add_argument('-k', '--kafka.bootstrap.servers', 
default='localhost:9092', help='Apache Kafka bootstrap server')
argument_parser.add_argument('-t', '--kafka.topic', required=True, help='Apache 
Kafka target topic')

arguments = Arguments(vars(argument_parser.parse_args()))

mongodb_client = MongoClient(arguments.mongodb_host)
mongodb_database = mongodb_client[arguments.mongodb_database]

kafka_producer = 
KafkaProducer(bootstrap_servers=[arguments.kafka_bootstrap_servers], 
value_serializer=lambda x: dumps(x).encode('ascii'), acks='all', retries=3)

for document in 
mongodb_database[arguments.mongodb_collection].with_options(codec_options=CodecOptions(unicode_decode_error_handler='ignore')).find():
  kafka_producer.send(arguments.kafka_topic, document)

kafka_producer.flush()

While the Kafka configuration I use is in summary: replication factor 1, number 
of log partitions equal to 6, number of IO threads, 8, number of network 
threads 3. 

Based on this information I have the following questions:

How can I improve the overall performance? I am aware of the fact that the 
overhead might be due to the MongoDB module I use. 
Can I improve the performance by increasing the number of log partitions? In 
addition, what is the “secret” behind setting an optimum number of log 
partitions?
Can I improve the performance by increasing the number of IO threads, 
considering the hardware configuration of mine?  
By increasing e.g. the number of log partitions in order to increase 
throughput, is the log message consuming performance going to be decreased?  

If I missed any relevant information, please do not hesitate of asking.

Thanks a lot for your help!

Kind regards,
Dominik Safaric 

—
Dominik Safaric | Software Engineer
+385 91 606 9504 | d <mailto:d.safa...@sig.eu>ominik.safa...@media-soft.info
Media-Soft | http://www.media-soft.info 

Reply via email to