Lots of malware. 210 MB is our upper limit that I want to support in the system we are building.
Most of the messages are metrics, command messages, meta data about the malware, etc... But so far Kafka 0.8 has worked out really well for moving the malware files, whitelist files, and the unknown stuff around to place them in storage, and run them through our meta data collection workers. 100s of servers. I am just trying to track down some memory issues on one class of workers that store the whitelist files for testing our av engine definitions before release. Cheers, Eric Sites On 8/15/13 12:27 AM, "Jun Rao" <jun...@gmail.com> wrote: >You need to set fetch.size in consumer config to be at least 210MB plus >message overhead (about 10 bytes) in Kafka. What data are you sending? >210MB for a single message is bit unusual for Kafka. > >Thanks, > >Jun > > >On Wed, Aug 14, 2013 at 9:11 PM, Eric Sites ><eric.si...@threattrack.com>wrote: > >> Everyone, >> >> I need a little help figuring out how buffers are allocated in Kafka >> consumers ver 0.8. >> >> What are the proper settings for a consumer that needs to receive a >>single >> message that is 210 MB in size. >> >> The consumer listens to multiple topics all with a single partition. One >> of the topics is where the 210 MB message will come from >> And the other topics will be very small messages. >> >> consumer = >> Consumer.createJavaConsumerConnector(createConsumerConfig()); >> Map<String, Integer> topicCountMap = new HashMap<String, >> Integer>(); >> topicCountMap.put(paradiso_scan_job_topic, 1); >> topicCountMap.put(paradiso_scan_cancel_topic, 1); >> topicCountMap.put(paradiso_add_worker_name_topic, 1); >> topicCountMap.put(paradiso_file_delete_topic, 1); >> >> Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = >> consumer.createMessageStreams(topicCountMap); >> >> // Start worker status threads to watch other topics >> ParadisoWorkerCancelConsumer cancel_consumer = new >> >>ParadisoWorkerCancelConsumer(consumerMap.get(paradiso_scan_cancel_topic). >>get(0)); >> cancel_consumer.start(); >> ParadisoWorkerFileAdd file_add = new >> >>ParadisoWorkerFileAdd(consumerMap.get(paradiso_add_worker_name_topic).get >>(0)); >> file_add.start(); >> ParadisoWorkerFileDelete file_delete = new >> >>ParadisoWorkerFileDelete(consumerMap.get(paradiso_file_delete_topic).get( >>0)); >> file_delete.start(); >> >> KafkaStream<byte[], byte[]> stream = >> consumerMap.get(paradiso_scan_job_topic).get(0); >> ConsumerIterator<byte[], byte[]> it = stream.iterator(); >> >> while (it.hasNext() && !time_to_shutdown) { >> >> Thanks, >> Eric Sites >> >>