Re: Help is processing huge data through Kafka-storm cluster
Hi All, Thanks for your valuable comments. Sure, I will give a try with Samza and Data Torrent. Meanwhile, I sharing screenshot of Storm UI. Please have a look at it. Kafka producer is able to push 35 million messages to broker in two hours with the of approx. 4k messages per second. On other side Storm is consuming with the max speed of 1100 messages per second. It means Storm is consuming messages 4 times slower than Kafka producing. We running these systems in production and I am bit worried about data loss. Kafka is pushing 35 million in 2 hours and Storm is taking 7-8 hours to process that much amount of data. There is a lag of 6 hours which is very scary. Please suggest me if I can do something to improve the performance of existing application before moving to new system. Thanks in advance. Regards, Riyaz On Tue, Jun 17, 2014 at 10:58 PM, Neha Narkhede neha.narkh...@gmail.com wrote: Samza is an open source stream processing framework built on top of Kafka and YARN. It is high throughput, scalable and has in built state management and fault tolerance support. Though I may be biased, it is worth taking a look :-) Thanks, Neha On Tue, Jun 17, 2014 at 10:55 AM, Robert Rodgers rsrodg...@gmail.com wrote: we have been experimenting with Samza which is also worth a look. It's basically a topic-to-topic node on Yarn. On Jun 17, 2014, at 10:44 AM, hsy...@gmail.com wrote: Hi Shaikh, I heard some throughput bottleneck of storm. It cannot really scale up with kafka. I recommend you to try DataTorrent platform( https://www.datatorrent.com/ ) The platform itself is not open-source but it has a open-source library ( https://github.com/DataTorrent/Malhar) which contains a kafka ingestion functions. The library is pretty cool, it can scale up dynamically with kafka partitions and is fully HA. And in your case you might be able to use the platform for free.(It's free if your application doesn't require large amount of memory) With datatorrent platform and the open-source library I can scale my application up to 300k/s (10 nodes, 3 replica, 1kb msg, 0.8.0 client). I heard the performance of kafka client has been improved for 0.8.1 release :) Best, Siyuan On Sat, Jun 14, 2014 at 8:14 AM, Shaikh Ahmed rnsr.sha...@gmail.com wrote: Hi, Daily we are downloaded 28 Million of messages and Monthly it goes up to 800+ million. We want to process this amount of data through our kafka and storm cluster and would like to store in HBase cluster. We are targeting to process one month of data in one day. Is it possible? We have setup our cluster thinking that we can process million of messages in one sec as mentioned on web. Unfortunately, we have ended-up with processing only 1200-1700 message per second. if we continue with this speed than it will take min 10 days to process 30 days of data, which is the relevant solution in our case. I suspect that we have to change some configuration to achieve this goal. Looking for help from experts to support me in achieving this task. *Kafka Cluster:* Kafka is running on two dedicated machines with 48 GB of RAM and 2TB of storage. We have total 11 nodes kafka cluster spread across these two servers. *Kafka Configuration:* producer.type=async compression.codec=none request.required.acks=-1 serializer.class=kafka.serializer.StringEncoder queue.buffering.max.ms=10 batch.num.messages=1 queue.buffering.max.messages=10 default.replication.factor=3 controlled.shutdown.enable=true auto.leader.rebalance.enable=true num.network.threads=2 num.io.threads=8 num.partitions=4 log.retention.hours=12 log.segment.bytes=536870912 log.retention.check.interval.ms=6 log.cleaner.enable=false *Storm Cluster:* Storm is running with 5 supervisor and 1 nimbus on IBM servers with 48 GB of RAM and 8TB of storage. These servers are shared with hbase cluster. *Kafka spout configuration* kafkaConfig.bufferSizeBytes = 1024*1024*8; kafkaConfig.fetchSizeBytes = 1024*1024*4; kafkaConfig.forceFromStart = true; *Topology: StormTopology* Spout - Partition: 4 First Bolt - parallelism hint: 6 and Num tasks: 5 Second Bolt - parallelism hint: 5 Third Bolt - parallelism hint: 3 Fourth Bolt - parallelism hint: 3 and Num tasks: 4 Fifth Bolt - parallelism hint: 3 Sixth Bolt - parallelism hint: 3 *Supervisor configuration:* storm.local.dir: /app/storm storm.zookeeper.port: 2181 storm.cluster.mode: distributed storm.local.mode.zmq: false supervisor.slots.ports: - 6700 - 6701 - 6702 - 6703 supervisor.worker.start.timeout.secs: 180 supervisor.worker.timeout.secs: 30 supervisor.monitor.frequency.secs: 3
Re: Help is processing huge data through Kafka-storm cluster
To clarify for my last email, by 10 nodes, I mean 10 kafka partitions distributed in 10 different brokers. In my test, datatorrent can scale up linearly with kafka partitions without any problem. Whatever you produce to kafka, it can easily take into your application. And I'm quite sure it can handle much more data than kafka boundary. :) Best, Siyuan On Thu, Jun 19, 2014 at 4:30 PM, Shaikh Ahmed rnsr.sha...@gmail.com wrote: Hi All, Thanks for your valuable comments. Sure, I will give a try with Samza and Data Torrent. Meanwhile, I sharing screenshot of Storm UI. Please have a look at it. Kafka producer is able to push 35 million messages to broker in two hours with the of approx. 4k messages per second. On other side Storm is consuming with the max speed of 1100 messages per second. It means Storm is consuming messages 4 times slower than Kafka producing. We running these systems in production and I am bit worried about data loss. Kafka is pushing 35 million in 2 hours and Storm is taking 7-8 hours to process that much amount of data. There is a lag of 6 hours which is very scary. Please suggest me if I can do something to improve the performance of existing application before moving to new system. Thanks in advance. Regards, Riyaz On Tue, Jun 17, 2014 at 10:58 PM, Neha Narkhede neha.narkh...@gmail.com wrote: Samza is an open source stream processing framework built on top of Kafka and YARN. It is high throughput, scalable and has in built state management and fault tolerance support. Though I may be biased, it is worth taking a look :-) Thanks, Neha On Tue, Jun 17, 2014 at 10:55 AM, Robert Rodgers rsrodg...@gmail.com wrote: we have been experimenting with Samza which is also worth a look. It's basically a topic-to-topic node on Yarn. On Jun 17, 2014, at 10:44 AM, hsy...@gmail.com wrote: Hi Shaikh, I heard some throughput bottleneck of storm. It cannot really scale up with kafka. I recommend you to try DataTorrent platform( https://www.datatorrent.com/ ) The platform itself is not open-source but it has a open-source library ( https://github.com/DataTorrent/Malhar) which contains a kafka ingestion functions. The library is pretty cool, it can scale up dynamically with kafka partitions and is fully HA. And in your case you might be able to use the platform for free.(It's free if your application doesn't require large amount of memory) With datatorrent platform and the open-source library I can scale my application up to 300k/s (10 nodes, 3 replica, 1kb msg, 0.8.0 client). I heard the performance of kafka client has been improved for 0.8.1 release :) Best, Siyuan On Sat, Jun 14, 2014 at 8:14 AM, Shaikh Ahmed rnsr.sha...@gmail.com wrote: Hi, Daily we are downloaded 28 Million of messages and Monthly it goes up to 800+ million. We want to process this amount of data through our kafka and storm cluster and would like to store in HBase cluster. We are targeting to process one month of data in one day. Is it possible? We have setup our cluster thinking that we can process million of messages in one sec as mentioned on web. Unfortunately, we have ended-up with processing only 1200-1700 message per second. if we continue with this speed than it will take min 10 days to process 30 days of data, which is the relevant solution in our case. I suspect that we have to change some configuration to achieve this goal. Looking for help from experts to support me in achieving this task. *Kafka Cluster:* Kafka is running on two dedicated machines with 48 GB of RAM and 2TB of storage. We have total 11 nodes kafka cluster spread across these two servers. *Kafka Configuration:* producer.type=async compression.codec=none request.required.acks=-1 serializer.class=kafka.serializer.StringEncoder queue.buffering.max.ms=10 batch.num.messages=1 queue.buffering.max.messages=10 default.replication.factor=3 controlled.shutdown.enable=true auto.leader.rebalance.enable=true num.network.threads=2 num.io.threads=8 num.partitions=4 log.retention.hours=12 log.segment.bytes=536870912 log.retention.check.interval.ms=6 log.cleaner.enable=false *Storm Cluster:* Storm is running with 5 supervisor and 1 nimbus on IBM servers with 48 GB of RAM and 8TB of storage. These servers are shared with hbase cluster. *Kafka spout configuration* kafkaConfig.bufferSizeBytes = 1024*1024*8; kafkaConfig.fetchSizeBytes = 1024*1024*4; kafkaConfig.forceFromStart = true; *Topology: StormTopology* Spout - Partition: 4 First Bolt - parallelism hint: 6 and Num tasks: 5 Second Bolt - parallelism hint: 5 Third Bolt - parallelism hint: 3 Fourth Bolt - parallelism hint: 3 and Num tasks: 4 Fifth
Re: Help is processing huge data through Kafka-storm cluster
Hi Shaikh, I heard some throughput bottleneck of storm. It cannot really scale up with kafka. I recommend you to try DataTorrent platform(https://www.datatorrent.com/) The platform itself is not open-source but it has a open-source library ( https://github.com/DataTorrent/Malhar) which contains a kafka ingestion functions. The library is pretty cool, it can scale up dynamically with kafka partitions and is fully HA. And in your case you might be able to use the platform for free.(It's free if your application doesn't require large amount of memory) With datatorrent platform and the open-source library I can scale my application up to 300k/s (10 nodes, 3 replica, 1kb msg, 0.8.0 client). I heard the performance of kafka client has been improved for 0.8.1 release :) Best, Siyuan On Sat, Jun 14, 2014 at 8:14 AM, Shaikh Ahmed rnsr.sha...@gmail.com wrote: Hi, Daily we are downloaded 28 Million of messages and Monthly it goes up to 800+ million. We want to process this amount of data through our kafka and storm cluster and would like to store in HBase cluster. We are targeting to process one month of data in one day. Is it possible? We have setup our cluster thinking that we can process million of messages in one sec as mentioned on web. Unfortunately, we have ended-up with processing only 1200-1700 message per second. if we continue with this speed than it will take min 10 days to process 30 days of data, which is the relevant solution in our case. I suspect that we have to change some configuration to achieve this goal. Looking for help from experts to support me in achieving this task. *Kafka Cluster:* Kafka is running on two dedicated machines with 48 GB of RAM and 2TB of storage. We have total 11 nodes kafka cluster spread across these two servers. *Kafka Configuration:* producer.type=async compression.codec=none request.required.acks=-1 serializer.class=kafka.serializer.StringEncoder queue.buffering.max.ms=10 batch.num.messages=1 queue.buffering.max.messages=10 default.replication.factor=3 controlled.shutdown.enable=true auto.leader.rebalance.enable=true num.network.threads=2 num.io.threads=8 num.partitions=4 log.retention.hours=12 log.segment.bytes=536870912 log.retention.check.interval.ms=6 log.cleaner.enable=false *Storm Cluster:* Storm is running with 5 supervisor and 1 nimbus on IBM servers with 48 GB of RAM and 8TB of storage. These servers are shared with hbase cluster. *Kafka spout configuration* kafkaConfig.bufferSizeBytes = 1024*1024*8; kafkaConfig.fetchSizeBytes = 1024*1024*4; kafkaConfig.forceFromStart = true; *Topology: StormTopology* Spout - Partition: 4 First Bolt - parallelism hint: 6 and Num tasks: 5 Second Bolt - parallelism hint: 5 Third Bolt - parallelism hint: 3 Fourth Bolt - parallelism hint: 3 and Num tasks: 4 Fifth Bolt - parallelism hint: 3 Sixth Bolt - parallelism hint: 3 *Supervisor configuration:* storm.local.dir: /app/storm storm.zookeeper.port: 2181 storm.cluster.mode: distributed storm.local.mode.zmq: false supervisor.slots.ports: - 6700 - 6701 - 6702 - 6703 supervisor.worker.start.timeout.secs: 180 supervisor.worker.timeout.secs: 30 supervisor.monitor.frequency.secs: 3 supervisor.heartbeat.frequency.secs: 5 supervisor.enable: true storm.messaging.netty.server_worker_threads: 2 storm.messaging.netty.client_worker_threads: 2 storm.messaging.netty.buffer_size: 52428800 #50MB buffer storm.messaging.netty.max_retries: 25 storm.messaging.netty.max_wait_ms: 1000 storm.messaging.netty.min_wait_ms: 100 supervisor.childopts: -Xmx1024m -Djava.net.preferIPv4Stack=true worker.childopts: -Xmx2048m -Djava.net.preferIPv4Stack=true Please let me know if more information needed.. Thanks in advance. Regards, Riyaz
Re: Help is processing huge data through Kafka-storm cluster
we have been experimenting with Samza which is also worth a look. It's basically a topic-to-topic node on Yarn. On Jun 17, 2014, at 10:44 AM, hsy...@gmail.com wrote: Hi Shaikh, I heard some throughput bottleneck of storm. It cannot really scale up with kafka. I recommend you to try DataTorrent platform(https://www.datatorrent.com/) The platform itself is not open-source but it has a open-source library ( https://github.com/DataTorrent/Malhar) which contains a kafka ingestion functions. The library is pretty cool, it can scale up dynamically with kafka partitions and is fully HA. And in your case you might be able to use the platform for free.(It's free if your application doesn't require large amount of memory) With datatorrent platform and the open-source library I can scale my application up to 300k/s (10 nodes, 3 replica, 1kb msg, 0.8.0 client). I heard the performance of kafka client has been improved for 0.8.1 release :) Best, Siyuan On Sat, Jun 14, 2014 at 8:14 AM, Shaikh Ahmed rnsr.sha...@gmail.com wrote: Hi, Daily we are downloaded 28 Million of messages and Monthly it goes up to 800+ million. We want to process this amount of data through our kafka and storm cluster and would like to store in HBase cluster. We are targeting to process one month of data in one day. Is it possible? We have setup our cluster thinking that we can process million of messages in one sec as mentioned on web. Unfortunately, we have ended-up with processing only 1200-1700 message per second. if we continue with this speed than it will take min 10 days to process 30 days of data, which is the relevant solution in our case. I suspect that we have to change some configuration to achieve this goal. Looking for help from experts to support me in achieving this task. *Kafka Cluster:* Kafka is running on two dedicated machines with 48 GB of RAM and 2TB of storage. We have total 11 nodes kafka cluster spread across these two servers. *Kafka Configuration:* producer.type=async compression.codec=none request.required.acks=-1 serializer.class=kafka.serializer.StringEncoder queue.buffering.max.ms=10 batch.num.messages=1 queue.buffering.max.messages=10 default.replication.factor=3 controlled.shutdown.enable=true auto.leader.rebalance.enable=true num.network.threads=2 num.io.threads=8 num.partitions=4 log.retention.hours=12 log.segment.bytes=536870912 log.retention.check.interval.ms=6 log.cleaner.enable=false *Storm Cluster:* Storm is running with 5 supervisor and 1 nimbus on IBM servers with 48 GB of RAM and 8TB of storage. These servers are shared with hbase cluster. *Kafka spout configuration* kafkaConfig.bufferSizeBytes = 1024*1024*8; kafkaConfig.fetchSizeBytes = 1024*1024*4; kafkaConfig.forceFromStart = true; *Topology: StormTopology* Spout - Partition: 4 First Bolt - parallelism hint: 6 and Num tasks: 5 Second Bolt - parallelism hint: 5 Third Bolt - parallelism hint: 3 Fourth Bolt - parallelism hint: 3 and Num tasks: 4 Fifth Bolt - parallelism hint: 3 Sixth Bolt - parallelism hint: 3 *Supervisor configuration:* storm.local.dir: /app/storm storm.zookeeper.port: 2181 storm.cluster.mode: distributed storm.local.mode.zmq: false supervisor.slots.ports: - 6700 - 6701 - 6702 - 6703 supervisor.worker.start.timeout.secs: 180 supervisor.worker.timeout.secs: 30 supervisor.monitor.frequency.secs: 3 supervisor.heartbeat.frequency.secs: 5 supervisor.enable: true storm.messaging.netty.server_worker_threads: 2 storm.messaging.netty.client_worker_threads: 2 storm.messaging.netty.buffer_size: 52428800 #50MB buffer storm.messaging.netty.max_retries: 25 storm.messaging.netty.max_wait_ms: 1000 storm.messaging.netty.min_wait_ms: 100 supervisor.childopts: -Xmx1024m -Djava.net.preferIPv4Stack=true worker.childopts: -Xmx2048m -Djava.net.preferIPv4Stack=true Please let me know if more information needed.. Thanks in advance. Regards, Riyaz
Re: Help is processing huge data through Kafka-storm cluster
Samza is an open source stream processing framework built on top of Kafka and YARN. It is high throughput, scalable and has in built state management and fault tolerance support. Though I may be biased, it is worth taking a look :-) Thanks, Neha On Tue, Jun 17, 2014 at 10:55 AM, Robert Rodgers rsrodg...@gmail.com wrote: we have been experimenting with Samza which is also worth a look. It's basically a topic-to-topic node on Yarn. On Jun 17, 2014, at 10:44 AM, hsy...@gmail.com wrote: Hi Shaikh, I heard some throughput bottleneck of storm. It cannot really scale up with kafka. I recommend you to try DataTorrent platform(https://www.datatorrent.com/ ) The platform itself is not open-source but it has a open-source library ( https://github.com/DataTorrent/Malhar) which contains a kafka ingestion functions. The library is pretty cool, it can scale up dynamically with kafka partitions and is fully HA. And in your case you might be able to use the platform for free.(It's free if your application doesn't require large amount of memory) With datatorrent platform and the open-source library I can scale my application up to 300k/s (10 nodes, 3 replica, 1kb msg, 0.8.0 client). I heard the performance of kafka client has been improved for 0.8.1 release :) Best, Siyuan On Sat, Jun 14, 2014 at 8:14 AM, Shaikh Ahmed rnsr.sha...@gmail.com wrote: Hi, Daily we are downloaded 28 Million of messages and Monthly it goes up to 800+ million. We want to process this amount of data through our kafka and storm cluster and would like to store in HBase cluster. We are targeting to process one month of data in one day. Is it possible? We have setup our cluster thinking that we can process million of messages in one sec as mentioned on web. Unfortunately, we have ended-up with processing only 1200-1700 message per second. if we continue with this speed than it will take min 10 days to process 30 days of data, which is the relevant solution in our case. I suspect that we have to change some configuration to achieve this goal. Looking for help from experts to support me in achieving this task. *Kafka Cluster:* Kafka is running on two dedicated machines with 48 GB of RAM and 2TB of storage. We have total 11 nodes kafka cluster spread across these two servers. *Kafka Configuration:* producer.type=async compression.codec=none request.required.acks=-1 serializer.class=kafka.serializer.StringEncoder queue.buffering.max.ms=10 batch.num.messages=1 queue.buffering.max.messages=10 default.replication.factor=3 controlled.shutdown.enable=true auto.leader.rebalance.enable=true num.network.threads=2 num.io.threads=8 num.partitions=4 log.retention.hours=12 log.segment.bytes=536870912 log.retention.check.interval.ms=6 log.cleaner.enable=false *Storm Cluster:* Storm is running with 5 supervisor and 1 nimbus on IBM servers with 48 GB of RAM and 8TB of storage. These servers are shared with hbase cluster. *Kafka spout configuration* kafkaConfig.bufferSizeBytes = 1024*1024*8; kafkaConfig.fetchSizeBytes = 1024*1024*4; kafkaConfig.forceFromStart = true; *Topology: StormTopology* Spout - Partition: 4 First Bolt - parallelism hint: 6 and Num tasks: 5 Second Bolt - parallelism hint: 5 Third Bolt - parallelism hint: 3 Fourth Bolt - parallelism hint: 3 and Num tasks: 4 Fifth Bolt - parallelism hint: 3 Sixth Bolt - parallelism hint: 3 *Supervisor configuration:* storm.local.dir: /app/storm storm.zookeeper.port: 2181 storm.cluster.mode: distributed storm.local.mode.zmq: false supervisor.slots.ports: - 6700 - 6701 - 6702 - 6703 supervisor.worker.start.timeout.secs: 180 supervisor.worker.timeout.secs: 30 supervisor.monitor.frequency.secs: 3 supervisor.heartbeat.frequency.secs: 5 supervisor.enable: true storm.messaging.netty.server_worker_threads: 2 storm.messaging.netty.client_worker_threads: 2 storm.messaging.netty.buffer_size: 52428800 #50MB buffer storm.messaging.netty.max_retries: 25 storm.messaging.netty.max_wait_ms: 1000 storm.messaging.netty.min_wait_ms: 100 supervisor.childopts: -Xmx1024m -Djava.net.preferIPv4Stack=true worker.childopts: -Xmx2048m -Djava.net.preferIPv4Stack=true Please let me know if more information needed.. Thanks in advance. Regards, Riyaz
Re: Help is processing huge data through Kafka-storm cluster
what throughput are you getting from your kafka cluster alone?Storm throughput can be dependent on what processing you are actually doing from inside it.so must look at each component starting from kafka first. Regards, Pushkar On Sat, Jun 14, 2014 at 8:44 PM, Shaikh Ahmed rnsr.sha...@gmail.com wrote: Hi, Daily we are downloaded 28 Million of messages and Monthly it goes up to 800+ million. We want to process this amount of data through our kafka and storm cluster and would like to store in HBase cluster. We are targeting to process one month of data in one day. Is it possible? We have setup our cluster thinking that we can process million of messages in one sec as mentioned on web. Unfortunately, we have ended-up with processing only 1200-1700 message per second. if we continue with this speed than it will take min 10 days to process 30 days of data, which is the relevant solution in our case. I suspect that we have to change some configuration to achieve this goal. Looking for help from experts to support me in achieving this task. *Kafka Cluster:* Kafka is running on two dedicated machines with 48 GB of RAM and 2TB of storage. We have total 11 nodes kafka cluster spread across these two servers. *Kafka Configuration:* producer.type=async compression.codec=none request.required.acks=-1 serializer.class=kafka.serializer.StringEncoder queue.buffering.max.ms=10 batch.num.messages=1 queue.buffering.max.messages=10 default.replication.factor=3 controlled.shutdown.enable=true auto.leader.rebalance.enable=true num.network.threads=2 num.io.threads=8 num.partitions=4 log.retention.hours=12 log.segment.bytes=536870912 log.retention.check.interval.ms=6 log.cleaner.enable=false *Storm Cluster:* Storm is running with 5 supervisor and 1 nimbus on IBM servers with 48 GB of RAM and 8TB of storage. These servers are shared with hbase cluster. *Kafka spout configuration* kafkaConfig.bufferSizeBytes = 1024*1024*8; kafkaConfig.fetchSizeBytes = 1024*1024*4; kafkaConfig.forceFromStart = true; *Topology: StormTopology* Spout - Partition: 4 First Bolt - parallelism hint: 6 and Num tasks: 5 Second Bolt - parallelism hint: 5 Third Bolt - parallelism hint: 3 Fourth Bolt - parallelism hint: 3 and Num tasks: 4 Fifth Bolt - parallelism hint: 3 Sixth Bolt - parallelism hint: 3 *Supervisor configuration:* storm.local.dir: /app/storm storm.zookeeper.port: 2181 storm.cluster.mode: distributed storm.local.mode.zmq: false supervisor.slots.ports: - 6700 - 6701 - 6702 - 6703 supervisor.worker.start.timeout.secs: 180 supervisor.worker.timeout.secs: 30 supervisor.monitor.frequency.secs: 3 supervisor.heartbeat.frequency.secs: 5 supervisor.enable: true storm.messaging.netty.server_worker_threads: 2 storm.messaging.netty.client_worker_threads: 2 storm.messaging.netty.buffer_size: 52428800 #50MB buffer storm.messaging.netty.max_retries: 25 storm.messaging.netty.max_wait_ms: 1000 storm.messaging.netty.min_wait_ms: 100 supervisor.childopts: -Xmx1024m -Djava.net.preferIPv4Stack=true worker.childopts: -Xmx2048m -Djava.net.preferIPv4Stack=true Please let me know if more information needed.. Thanks in advance. Regards, Riyaz
Re: Help is processing huge data through Kafka-storm cluster
and one more thing.using kafka metrices you can easily monitor at what rate you are able to publish on to kafka and what speed your consumer(in this case your spout) is able to drain messages out of kafka.it's possible that due to slowly draining out even publishing rate in worst case might get effected as if consumer lags behind too much then it will result into disk seeks while consuming the older messages. On Sun, Jun 15, 2014 at 8:16 PM, pushkar priyadarshi priyadarshi.push...@gmail.com wrote: what throughput are you getting from your kafka cluster alone?Storm throughput can be dependent on what processing you are actually doing from inside it.so must look at each component starting from kafka first. Regards, Pushkar On Sat, Jun 14, 2014 at 8:44 PM, Shaikh Ahmed rnsr.sha...@gmail.com wrote: Hi, Daily we are downloaded 28 Million of messages and Monthly it goes up to 800+ million. We want to process this amount of data through our kafka and storm cluster and would like to store in HBase cluster. We are targeting to process one month of data in one day. Is it possible? We have setup our cluster thinking that we can process million of messages in one sec as mentioned on web. Unfortunately, we have ended-up with processing only 1200-1700 message per second. if we continue with this speed than it will take min 10 days to process 30 days of data, which is the relevant solution in our case. I suspect that we have to change some configuration to achieve this goal. Looking for help from experts to support me in achieving this task. *Kafka Cluster:* Kafka is running on two dedicated machines with 48 GB of RAM and 2TB of storage. We have total 11 nodes kafka cluster spread across these two servers. *Kafka Configuration:* producer.type=async compression.codec=none request.required.acks=-1 serializer.class=kafka.serializer.StringEncoder queue.buffering.max.ms=10 batch.num.messages=1 queue.buffering.max.messages=10 default.replication.factor=3 controlled.shutdown.enable=true auto.leader.rebalance.enable=true num.network.threads=2 num.io.threads=8 num.partitions=4 log.retention.hours=12 log.segment.bytes=536870912 log.retention.check.interval.ms=6 log.cleaner.enable=false *Storm Cluster:* Storm is running with 5 supervisor and 1 nimbus on IBM servers with 48 GB of RAM and 8TB of storage. These servers are shared with hbase cluster. *Kafka spout configuration* kafkaConfig.bufferSizeBytes = 1024*1024*8; kafkaConfig.fetchSizeBytes = 1024*1024*4; kafkaConfig.forceFromStart = true; *Topology: StormTopology* Spout - Partition: 4 First Bolt - parallelism hint: 6 and Num tasks: 5 Second Bolt - parallelism hint: 5 Third Bolt - parallelism hint: 3 Fourth Bolt - parallelism hint: 3 and Num tasks: 4 Fifth Bolt - parallelism hint: 3 Sixth Bolt - parallelism hint: 3 *Supervisor configuration:* storm.local.dir: /app/storm storm.zookeeper.port: 2181 storm.cluster.mode: distributed storm.local.mode.zmq: false supervisor.slots.ports: - 6700 - 6701 - 6702 - 6703 supervisor.worker.start.timeout.secs: 180 supervisor.worker.timeout.secs: 30 supervisor.monitor.frequency.secs: 3 supervisor.heartbeat.frequency.secs: 5 supervisor.enable: true storm.messaging.netty.server_worker_threads: 2 storm.messaging.netty.client_worker_threads: 2 storm.messaging.netty.buffer_size: 52428800 #50MB buffer storm.messaging.netty.max_retries: 25 storm.messaging.netty.max_wait_ms: 1000 storm.messaging.netty.min_wait_ms: 100 supervisor.childopts: -Xmx1024m -Djava.net.preferIPv4Stack=true worker.childopts: -Xmx2048m -Djava.net.preferIPv4Stack=true Please let me know if more information needed.. Thanks in advance. Regards, Riyaz
Re: Help is processing huge data through Kafka-storm cluster
Hi Riyaz, There are a number of reasons that you may be getting low performance. Here are some questions to get started: 1. How big are your messages? To meet your throughput requirement you need a minimum of 10K messages per second continuously. You specified a replication factor of 3 so at a message length of 500 bytes (for example) you would need to write a minimum of 15mb/second continuously across both hosts. That is a small amount or a large amount depending on your storage configuration. 2. How did you determine the throughput rate? Is the throughput number end-to-end including Storm and HBase or do you see the low throughput for Kafka itself? In either case can you isolate the rates of ingress and egress to Kafka? Assuming the problem is in Kafka here are some more questions. 3. Are you running VMs? If so what kind and how many CPUs are allocated to each VM? 4. What kind of storage do you have? According to your description you have 11 nodes over two hosts? At the level you are attempting to reach anything less than SSDs or very performant RAID may be an issue due to random I/O. If you have network attached storage this can be a huge bottleneck. 5. What kind of network cards do you have? 6. What kind of stats do you see on the hosts when your tests are running? - What is the I/O wait? Anything above a few percent indicates problems. (Top gives good numbers) - What is the run queue length? CPU starvation could be a problem especially if you have VMs. (Top and uptime give good numbers.) - How much memory is in the OS page cache? This has a big impact on I/O efficiency if you are short of memory. (free -g gives useful numbers) - On a related topic are you reading from storage or are your reads served from memory (iostat should ideally show no reads from storage, only writes, because all reads are served from the OS page cache.) - Are you swapping? 7. What is the memory size for your JVMs and are you using Java 7? Do you have G1 GC enabled according to current Kafka recommendations? 8. Where is zookeeper running? It can be a bottleneck at high transaction rates. 9. How many topics do you have? 10. How many producers do you have and where are they running? 11. How many consumers are you running? I don't know Storm so it's hard to tell from the configuration you have listed how many would run or where they would operate. It seems possible you need to spread processing across more independent hosts but that is a guess pending other information. It is hard to evaluate your Kafka settings without this. Best regards, Robert On Sat, Jun 14, 2014 at 8:14 AM, Shaikh Ahmed rnsr.sha...@gmail.com wrote: Hi, Daily we are downloaded 28 Million of messages and Monthly it goes up to 800+ million. We want to process this amount of data through our kafka and storm cluster and would like to store in HBase cluster. We are targeting to process one month of data in one day. Is it possible? We have setup our cluster thinking that we can process million of messages in one sec as mentioned on web. Unfortunately, we have ended-up with processing only 1200-1700 message per second. if we continue with this speed than it will take min 10 days to process 30 days of data, which is the relevant solution in our case. I suspect that we have to change some configuration to achieve this goal. Looking for help from experts to support me in achieving this task. *Kafka Cluster:* Kafka is running on two dedicated machines with 48 GB of RAM and 2TB of storage. We have total 11 nodes kafka cluster spread across these two servers. *Kafka Configuration:* producer.type=async compression.codec=none request.required.acks=-1 serializer.class=kafka.serializer.StringEncoder queue.buffering.max.ms=10 batch.num.messages=1 queue.buffering.max.messages=10 default.replication.factor=3 controlled.shutdown.enable=true auto.leader.rebalance.enable=true num.network.threads=2 num.io.threads=8 num.partitions=4 log.retention.hours=12 log.segment.bytes=536870912 log.retention.check.interval.ms=6 log.cleaner.enable=false *Storm Cluster:* Storm is running with 5 supervisor and 1 nimbus on IBM servers with 48 GB of RAM and 8TB of storage. These servers are shared with hbase cluster. *Kafka spout configuration* kafkaConfig.bufferSizeBytes = 1024*1024*8; kafkaConfig.fetchSizeBytes = 1024*1024*4; kafkaConfig.forceFromStart = true; *Topology: StormTopology* Spout - Partition: 4 First Bolt - parallelism hint: 6 and Num tasks: 5 Second Bolt - parallelism hint: 5 Third Bolt - parallelism hint: 3 Fourth Bolt - parallelism hint: 3 and Num tasks: 4 Fifth Bolt - parallelism hint: 3 Sixth Bolt - parallelism hint: 3 *Supervisor configuration:* storm.local.dir: /app/storm storm.zookeeper.port: 2181 storm.cluster.mode: distributed storm.local.mode.zmq: false supervisor.slots.ports: - 6700 - 6701 - 6702 -
Re: Help is processing huge data through Kafka-storm cluster
+1 for detailed examination of metrics. You can see the main metrics here: https://kafka.apache.org/documentation.html#monitoring Jconsole is very helpful for looking quickly at what is going on. Cheers, Robert On Sun, Jun 15, 2014 at 7:49 AM, pushkar priyadarshi priyadarshi.push...@gmail.com wrote: and one more thing.using kafka metrices you can easily monitor at what rate you are able to publish on to kafka and what speed your consumer(in this case your spout) is able to drain messages out of kafka.it's possible that due to slowly draining out even publishing rate in worst case might get effected as if consumer lags behind too much then it will result into disk seeks while consuming the older messages. On Sun, Jun 15, 2014 at 8:16 PM, pushkar priyadarshi priyadarshi.push...@gmail.com wrote: what throughput are you getting from your kafka cluster alone?Storm throughput can be dependent on what processing you are actually doing from inside it.so must look at each component starting from kafka first. Regards, Pushkar On Sat, Jun 14, 2014 at 8:44 PM, Shaikh Ahmed rnsr.sha...@gmail.com wrote: Hi, Daily we are downloaded 28 Million of messages and Monthly it goes up to 800+ million. We want to process this amount of data through our kafka and storm cluster and would like to store in HBase cluster. We are targeting to process one month of data in one day. Is it possible? We have setup our cluster thinking that we can process million of messages in one sec as mentioned on web. Unfortunately, we have ended-up with processing only 1200-1700 message per second. if we continue with this speed than it will take min 10 days to process 30 days of data, which is the relevant solution in our case. I suspect that we have to change some configuration to achieve this goal. Looking for help from experts to support me in achieving this task. *Kafka Cluster:* Kafka is running on two dedicated machines with 48 GB of RAM and 2TB of storage. We have total 11 nodes kafka cluster spread across these two servers. *Kafka Configuration:* producer.type=async compression.codec=none request.required.acks=-1 serializer.class=kafka.serializer.StringEncoder queue.buffering.max.ms=10 batch.num.messages=1 queue.buffering.max.messages=10 default.replication.factor=3 controlled.shutdown.enable=true auto.leader.rebalance.enable=true num.network.threads=2 num.io.threads=8 num.partitions=4 log.retention.hours=12 log.segment.bytes=536870912 log.retention.check.interval.ms=6 log.cleaner.enable=false *Storm Cluster:* Storm is running with 5 supervisor and 1 nimbus on IBM servers with 48 GB of RAM and 8TB of storage. These servers are shared with hbase cluster. *Kafka spout configuration* kafkaConfig.bufferSizeBytes = 1024*1024*8; kafkaConfig.fetchSizeBytes = 1024*1024*4; kafkaConfig.forceFromStart = true; *Topology: StormTopology* Spout - Partition: 4 First Bolt - parallelism hint: 6 and Num tasks: 5 Second Bolt - parallelism hint: 5 Third Bolt - parallelism hint: 3 Fourth Bolt - parallelism hint: 3 and Num tasks: 4 Fifth Bolt - parallelism hint: 3 Sixth Bolt - parallelism hint: 3 *Supervisor configuration:* storm.local.dir: /app/storm storm.zookeeper.port: 2181 storm.cluster.mode: distributed storm.local.mode.zmq: false supervisor.slots.ports: - 6700 - 6701 - 6702 - 6703 supervisor.worker.start.timeout.secs: 180 supervisor.worker.timeout.secs: 30 supervisor.monitor.frequency.secs: 3 supervisor.heartbeat.frequency.secs: 5 supervisor.enable: true storm.messaging.netty.server_worker_threads: 2 storm.messaging.netty.client_worker_threads: 2 storm.messaging.netty.buffer_size: 52428800 #50MB buffer storm.messaging.netty.max_retries: 25 storm.messaging.netty.max_wait_ms: 1000 storm.messaging.netty.min_wait_ms: 100 supervisor.childopts: -Xmx1024m -Djava.net.preferIPv4Stack=true worker.childopts: -Xmx2048m -Djava.net.preferIPv4Stack=true Please let me know if more information needed.. Thanks in advance. Regards, Riyaz
Help is processing huge data through Kafka-storm cluster
Hi, Daily we are downloaded 28 Million of messages and Monthly it goes up to 800+ million. We want to process this amount of data through our kafka and storm cluster and would like to store in HBase cluster. We are targeting to process one month of data in one day. Is it possible? We have setup our cluster thinking that we can process million of messages in one sec as mentioned on web. Unfortunately, we have ended-up with processing only 1200-1700 message per second. if we continue with this speed than it will take min 10 days to process 30 days of data, which is the relevant solution in our case. I suspect that we have to change some configuration to achieve this goal. Looking for help from experts to support me in achieving this task. *Kafka Cluster:* Kafka is running on two dedicated machines with 48 GB of RAM and 2TB of storage. We have total 11 nodes kafka cluster spread across these two servers. *Kafka Configuration:* producer.type=async compression.codec=none request.required.acks=-1 serializer.class=kafka.serializer.StringEncoder queue.buffering.max.ms=10 batch.num.messages=1 queue.buffering.max.messages=10 default.replication.factor=3 controlled.shutdown.enable=true auto.leader.rebalance.enable=true num.network.threads=2 num.io.threads=8 num.partitions=4 log.retention.hours=12 log.segment.bytes=536870912 log.retention.check.interval.ms=6 log.cleaner.enable=false *Storm Cluster:* Storm is running with 5 supervisor and 1 nimbus on IBM servers with 48 GB of RAM and 8TB of storage. These servers are shared with hbase cluster. *Kafka spout configuration* kafkaConfig.bufferSizeBytes = 1024*1024*8; kafkaConfig.fetchSizeBytes = 1024*1024*4; kafkaConfig.forceFromStart = true; *Topology: StormTopology* Spout - Partition: 4 First Bolt - parallelism hint: 6 and Num tasks: 5 Second Bolt - parallelism hint: 5 Third Bolt - parallelism hint: 3 Fourth Bolt - parallelism hint: 3 and Num tasks: 4 Fifth Bolt - parallelism hint: 3 Sixth Bolt - parallelism hint: 3 *Supervisor configuration:* storm.local.dir: /app/storm storm.zookeeper.port: 2181 storm.cluster.mode: distributed storm.local.mode.zmq: false supervisor.slots.ports: - 6700 - 6701 - 6702 - 6703 supervisor.worker.start.timeout.secs: 180 supervisor.worker.timeout.secs: 30 supervisor.monitor.frequency.secs: 3 supervisor.heartbeat.frequency.secs: 5 supervisor.enable: true storm.messaging.netty.server_worker_threads: 2 storm.messaging.netty.client_worker_threads: 2 storm.messaging.netty.buffer_size: 52428800 #50MB buffer storm.messaging.netty.max_retries: 25 storm.messaging.netty.max_wait_ms: 1000 storm.messaging.netty.min_wait_ms: 100 supervisor.childopts: -Xmx1024m -Djava.net.preferIPv4Stack=true worker.childopts: -Xmx2048m -Djava.net.preferIPv4Stack=true Please let me know if more information needed.. Thanks in advance. Regards, Riyaz