Ok from the earlier logs it looks like your tuples are being timed out and getting replayed. In your PrintBolt.execute do collector.ack(tuple) public class PrintBolt extends BaseRichBolt { private static final long serialVersionUID = 1L; private OutputCollector collector;
public void execute(Tuple tuple) { System.out.println("message " + tuple.getValues()); collector.ack(tuple); } public void prepare(Map arg0, TopologyContext arg1, OutputCollector arg2) { this.collector = arg2; } public void declareOutputFields(OutputFieldsDeclarer arg0) { } } On Mon, Dec 1, 2014, at 07:10 PM, Madabhattula Rajesh Kumar wrote: > Thank you Harsha for your response. > > I'm just printing the messages in printer bolt. > > Please find below printer blot code > > *public class PrintBolt extends BaseRichBolt * *{* * private static > final long serialVersionUID = 1L;* * public void execute(Tuple tuple) > * * {* * System.out.println("message " + tuple.getValues());* * }* * > public void prepare(Map arg0, TopologyContext arg1, OutputCollector > arg2) {* * }* * public void declareOutputFields(OutputFieldsDeclarer > arg0) {* * }* *}* > > Regards, Rajesh > > > On Tue, Dec 2, 2014 at 8:19 AM, Harsha <st...@harsha.io> wrote: >> __ >> Does your printer bolt ack the messages it received from KafkaSpout. >> >> >> On Mon, Dec 1, 2014, at 06:38 PM, Madabhattula Rajesh Kumar wrote: >>> Hello, >>> >>> Could any one help me on above mail query? >>> >>> Regards, Rajesh >>> >>> On Sat, Nov 29, 2014 at 10:30 PM, Madabhattula Rajesh Kumar >>> <mrajaf...@gmail.com> wrote: >>>> Hello, >>>> >>>> I'm new to Storm and Kafka. I have tried Strom-Kafka integration >>>> example program. Now I'm able to send message from Kafka and >>>> receive those messages in storm topology. >>>> >>>> I have observed one thing in storm topology, same messages are >>>> processing continuously >>>> >>>> *I have sent three messages (First Message, Second Message, Third >>>> Message ). These 3 messages processing continuously, please find >>>> below console log file* >>>> >>>> *Could you please help me on below query* >>>> * *How to make sure that storm topology process messages one time >>>> successfully(Not multiple times). * >>>> * *What configurations I need to do * *Below is my code :* >>>> >>>> * BrokerHosts zk = new ZkHosts("localhost:2181");* * SpoutConfig >>>> spoutConf = new SpoutConfig(zk, "test-topic", "/kafkastorm", >>>> "discovery");* * spoutConf.scheme = new SchemeAsMultiScheme(new >>>> StringScheme());* * KafkaSpout spout = new >>>> KafkaSpout(spoutConf);* * TopologyBuilder builder = new >>>> TopologyBuilder();* * builder.setSpout("spout", spout, 1);* * >>>> builder.setBolt("printerbolt", new PrintBolt()) >>>> .shuffleGrouping("spout");* * Config config = new Config();* * >>>> config.setDebug(true);* * LocalCluster cluster = new >>>> LocalCluster();* * cluster.submitTopology("kafka", config, >>>> builder.createTopology());* >>>> >>>> *Log file :* >>>> >>>> 25526 [Thread-10-spout] INFO backtype.storm.daemon.task - Emitting: >>>> spout default [First Message] 25528 [Thread-8-printerbolt] INFO >>>> backtype.storm.daemon.executor - Processing received message >>>> source: spout:3, stream: default, id: >>>> {-5148901491748001310=-1334200518948214946}, [First Message] >>>> *message [First Message]* 25538 [Thread-10-spout] INFO >>>> backtype.storm.daemon.task - Emitting: spout __ack_init >>>> [-5148901491748001310 -1334200518948214946 3] 25539 >>>> [Thread-14-__acker] INFO backtype.storm.daemon.executor - >>>> Processing received message source: spout:3, stream: __ack_init, >>>> id: {}, [-5148901491748001310 -1334200518948214946 3] 33530 >>>> [Thread-10-spout] INFO backtype.storm.daemon.task - Emitting: spout >>>> default [Second Message] 33531 [Thread-8-printerbolt] INFO >>>> backtype.storm.daemon.executor - Processing received message >>>> source: spout:3, stream: default, id: >>>> {-8623931148894813393=4843611232629293066}, [Second Message] >>>> *message [Second Message]* 33531 [Thread-10-spout] INFO >>>> backtype.storm.daemon.task - Emitting: spout __ack_init >>>> [-8623931148894813393 4843611232629293066 3] 33532 >>>> [Thread-14-__acker] INFO backtype.storm.daemon.executor - >>>> Processing received message source: spout:3, stream: __ack_init, >>>> id: {}, [-8623931148894813393 4843611232629293066 3] 38532 >>>> [Thread-10-spout] INFO backtype.storm.daemon.task - Emitting: spout >>>> default [Thrid Message] 38536 [Thread-8-printerbolt] INFO >>>> backtype.storm.daemon.executor - Processing received message >>>> source: spout:3, stream: default, id: >>>> {-7749553958395790620=-1739211867328620785}, [Thrid Message] >>>> *message [Thrid Message]* 38537 [Thread-10-spout] INFO >>>> backtype.storm.daemon.task - Emitting: spout __ack_init >>>> [-7749553958395790620 -1739211867328620785 3] 38537 >>>> [Thread-14-__acker] INFO backtype.storm.daemon.executor - >>>> Processing received message source: spout:3, stream: __ack_init, >>>> id: {}, [-7749553958395790620 -1739211867328620785 3] 46201 >>>> [Thread-10-spout] INFO backtype.storm.daemon.executor - Processing >>>> received message source: __system:-1, stream: __tick, id: {}, [30] >>>> 76155 [Thread-8-printerbolt] INFO backtype.storm.daemon.executor - >>>> Processing received message source: __system:-1, stream: >>>> __metrics_tick, id: {}, [60] 76159 [Thread-8-printerbolt] INFO >>>> backtype.storm.daemon.task - Emitting: printerbolt __metrics >>>> [#<TaskInfo >>>> backtype.storm.metric.api.IMetricsConsumer$TaskInfo@780b0702> >>>> [#<DataPoint [__ack-count = {}]> #<DataPoint [__sendqueue = >>>> {write_pos=-1, read_pos=-1, capacity=1024, population=0}]> >>>> #<DataPoint [__receive = {write_pos=4, read_pos=3, capacity=1024, >>>> population=1}]> #<DataPoint [__process-latency = {}]> #<DataPoint >>>> [__transfer-count = {}]> #<DataPoint [__execute-latency = {}]> >>>> #<DataPoint [__fail-count = {}]> #<DataPoint [__emit-count = {}]> >>>> #<DataPoint [__execute-count = {}]>]] 76202 [Thread-10-spout] INFO >>>> backtype.storm.daemon.executor - Processing received message >>>> source: __system:-1, stream: __tick, id: {}, [30] 76206 >>>> [Thread-10-spout] INFO backtype.storm.daemon.task - Emitting: spout >>>> default [First Message] 76206 [Thread-8-printerbolt] INFO >>>> backtype.storm.daemon.executor - Processing received message >>>> source: spout:3, stream: default, id: >>>> {956790162864404846=494501721511970112}, [First Message] *message >>>> [First Message]* 76207 [Thread-10-spout] INFO >>>> backtype.storm.daemon.task - Emitting: spout __ack_init >>>> [956790162864404846 494501721511970112 3] 76207 [Thread-14-__acker] >>>> INFO backtype.storm.daemon.executor - Processing received message >>>> source: spout:3, stream: __ack_init, id: {}, [956790162864404846 >>>> 494501721511970112 3] 76207 [Thread-10-spout] INFO >>>> backtype.storm.daemon.task - Emitting: spout default [Second >>>> Message] 76208 [Thread-8-printerbolt] INFO >>>> backtype.storm.daemon.executor - Processing received message >>>> source: spout:3, stream: default, id: >>>> {-5947127688111528870=-2474569870953878080}, [Second Message] >>>> *message [Second Message]* 76208 [Thread-10-spout] INFO >>>> backtype.storm.daemon.task - Emitting: spout __ack_init >>>> [-5947127688111528870 -2474569870953878080 3] 76208 >>>> [Thread-14-__acker] INFO backtype.storm.daemon.executor - >>>> Processing received message source: spout:3, stream: __ack_init, >>>> id: {}, [-5947127688111528870 -2474569870953878080 3] 76208 >>>> [Thread-10-spout] INFO backtype.storm.daemon.task - Emitting: spout >>>> default [Thrid Message] 76209 [Thread-8-printerbolt] INFO >>>> backtype.storm.daemon.executor - Processing received message >>>> source: spout:3, stream: default, id: >>>> {4790700513589938438=-2542940781190231591}, [Thrid Message] >>>> *message [Thrid Message]* 76209 [Thread-10-spout] INFO >>>> backtype.storm.daemon.task - Emitting: spout __ack_init >>>> [4790700513589938438 -2542940781190231591 3] 76209 >>>> [Thread-14-__acker] INFO backtype.storm.daemon.executor - >>>> Processing received message source: spout:3, stream: __ack_init, >>>> id: {}, [4790700513589938438 -2542940781190231591 3] 76276 >>>> [Thread-12-__system] INFO backtype.storm.daemon.executor - >>>> Processing received message source: __system:-1, stream: >>>> __metrics_tick, id: {}, [60] 76277 [Thread-12-__system] INFO >>>> backtype.storm.daemon.task - Emitting: __system __metrics >>>> [#<TaskInfo >>>> backtype.storm.metric.api.IMetricsConsumer$TaskInfo@559c0881> >>>> [#<DataPoint [__ack-count = {}]> #<DataPoint [memory/heap = >>>> {unusedBytes=39294328, usedBytes=23489160, maxBytes=1003487232, >>>> initBytes=64761920, virtualFreeBytes=979998072, >>>> committedBytes=62783488}]> #<DataPoint [__receive = {write_pos=1, >>>> read_pos=0, capacity=1024, population=1}]> #<DataPoint >>>> [__fail-count = {}]> #<DataPoint [__execute-latency = {}]> >>>> #<DataPoint [newWorkerEvent = 1]> #<DataPoint [__emit-count = {}]> >>>> #<DataPoint [__execute-count = {}]> #<DataPoint [__sendqueue = >>>> {write_pos=-1, read_pos=-1, capacity=1024, population=0}]> >>>> #<DataPoint [memory/nonHeap = {unusedBytes=171552, >>>> usedBytes=41312736, maxBytes=224395264, initBytes=24313856, >>>> virtualFreeBytes=183082528, committedBytes=41484288}]> #<DataPoint >>>> [uptimeSecs = 76.666]> #<DataPoint [__transfer = {write_pos=12, >>>> read_pos=12, capacity=1024, population=0}]> #<DataPoint >>>> [startTimeSecs = 1.417279417821E9]> #<DataPoint [__process-latency >>>> = {}]> #<DataPoint [__transfer-count = {}]>]] 76364 >>>> [Thread-14-__acker] INFO backtype.storm.daemon.executor - >>>> Processing received message source: __system:-1, stream: >>>> __metrics_tick, id: {}, [60] 76365 [Thread-14-__acker] INFO >>>> backtype.storm.daemon.task - Emitting: __acker __metrics >>>> [#<TaskInfo >>>> backtype.storm.metric.api.IMetricsConsumer$TaskInfo@76f2790f> >>>> [#<DataPoint [__ack-count = {}]> #<DataPoint [__sendqueue = >>>> {write_pos=-1, read_pos=-1, capacity=1024, population=0}]> >>>> #<DataPoint [__receive = {write_pos=7, read_pos=6, capacity=1024, >>>> population=1}]> #<DataPoint [__process-latency = {}]> #<DataPoint >>>> [__transfer-count = {}]> #<DataPoint [__execute-latency = >>>> {spout:__ack_init=0.0}]> #<DataPoint [__fail-count = {}]> >>>> #<DataPoint [__emit-count = {}]> #<DataPoint [__execute-count = >>>> {spout:__ack_init=20}]>]] 76377 [Thread-10-spout] INFO >>>> backtype.storm.daemon.executor - Processing received message >>>> source: __system:-1, stream: __metrics_tick, id: {}, [60] 76381 >>>> [Thread-10-spout] WARN storm.kafka.KafkaUtils - No data found in >>>> Kafka Partition partition_1 76382 [Thread-10-spout] INFO >>>> backtype.storm.daemon.task - Emitting: spout __metrics [#<TaskInfo >>>> backtype.storm.metric.api.IMetricsConsumer$TaskInfo@28ea04cb> >>>> [#<DataPoint [__ack-count = {}]> #<DataPoint [__sendqueue = >>>> {write_pos=11, read_pos=11, capacity=1024, population=0}]> >>>> #<DataPoint [__complete-latency = {}]> #<DataPoint [__receive = >>>> {write_pos=3, read_pos=2, capacity=1024, population=1}]> >>>> #<DataPoint [kafkaPartition = >>>> {Partition{host=rajesh-VirtualBox:9092, >>>> partition=0}/fetchAPILatencyMax=19, >>>> Partition{host=rajesh-VirtualBox:9092, >>>> partition=0}/fetchAPICallCount=28451, >>>> Partition{host=rajesh-VirtualBox:9092, >>>> partition=1}/fetchAPICallCount=28452, >>>> Partition{host=rajesh-VirtualBox:9092, >>>> partition=0}/fetchAPIMessageCount=0, >>>> Partition{host=rajesh-VirtualBox:9092, >>>> partition=1}/fetchAPILatencyMean=0.05672711935892029, >>>> Partition{host=rajesh-VirtualBox:9092, >>>> partition=1}/fetchAPIMessageCount=6, >>>> Partition{host=rajesh-VirtualBox:9092, >>>> partition=1}/fetchAPILatencyMax=42, >>>> Partition{host=rajesh-VirtualBox:9092, >>>> partition=0}/fetchAPILatencyMean=0.04481389054866261}]> #<DataPoint >>>> [__transfer-count = {}]> #<DataPoint [__fail-count = {default=20}]> >>>> #<DataPoint [__emit-count = {}]>]] 76943 [Thread-10-spout] INFO >>>> storm.kafka.ZkCoordinator - Task [1/1] Refreshing partition manager >>>> connections 76949 [Thread-10-spout] INFO >>>> storm.kafka.DynamicBrokersReader - Read partition info from >>>> zookeeper: >>>> GlobalPartitionInformation{partitionMap={0=rajesh-VirtualBox:9092, >>>> 1=rajesh-VirtualBox:9092}} 76949 [Thread-10-spout] INFO >>>> storm.kafka.KafkaUtils - Task [1/1] assigned >>>> [Partition{host=rajesh-VirtualBox:9092, partition=0}, >>>> Partition{host=rajesh-VirtualBox:9092, partition=1}] 76949 >>>> [Thread-10-spout] INFO storm.kafka.ZkCoordinator - Task [1/1] >>>> Deleted partition managers: [] 76949 [Thread-10-spout] INFO >>>> storm.kafka.ZkCoordinator - Task [1/1] New partition managers: [] >>>> 76949 [Thread-10-spout] INFO storm.kafka.ZkCoordinator - Task [1/1] >>>> Finished refreshing 106203 [Thread-10-spout] INFO >>>> backtype.storm.daemon.executor - Processing received message >>>> source: __system:-1, stream: __tick, id: {}, [30] 136154 >>>> [Thread-8-printerbolt] INFO backtype.storm.daemon.executor - >>>> Processing received message source: __system:-1, stream: >>>> __metrics_tick, id: {}, [60] 136155 [Thread-8-printerbolt] INFO >>>> backtype.storm.daemon.task - Emitting: printerbolt __metrics >>>> [#<TaskInfo >>>> backtype.storm.metric.api.IMetricsConsumer$TaskInfo@6ba3a9e9> >>>> [#<DataPoint [__ack-count = {}]> #<DataPoint [__sendqueue = >>>> {write_pos=-1, read_pos=-1, capacity=1024, population=0}]> >>>> #<DataPoint [__receive = {write_pos=8, read_pos=7, capacity=1024, >>>> population=1}]> #<DataPoint [__process-latency = {}]> #<DataPoint >>>> [__transfer-count = {}]> #<DataPoint [__execute-latency = {}]> >>>> #<DataPoint [__fail-count = {}]> #<DataPoint [__emit-count = {}]> >>>> #<DataPoint [__execute-count = {}]>]] 136204 [Thread-10-spout] INFO >>>> backtype.storm.daemon.executor - Processing received message >>>> source: __system:-1, stream: __tick, id: {}, [30] 136206 >>>> [Thread-10-spout] INFO backtype.storm.daemon.task - Emitting: spout >>>> default [First Message] 136206 [Thread-8-printerbolt] INFO >>>> backtype.storm.daemon.executor - Processing received message >>>> source: spout:3, stream: default, id: >>>> {3336041025082572443=4848943651836321291}, [First Message] *message >>>> [First Message]* 136206 [Thread-10-spout] INFO >>>> backtype.storm.daemon.task - Emitting: spout __ack_init >>>> [3336041025082572443 4848943651836321291 3] 136207 >>>> [Thread-14-__acker] INFO backtype.storm.daemon.executor - >>>> Processing received message source: spout:3, stream: __ack_init, >>>> id: {}, [3336041025082572443 4848943651836321291 3] 136207 >>>> [Thread-10-spout] INFO backtype.storm.daemon.task - Emitting: spout >>>> default [Second Message] 136208 [Thread-8-printerbolt] INFO >>>> backtype.storm.daemon.executor - Processing received message >>>> source: spout:3, stream: default, id: >>>> {8818700006514275130=7403177023020018790}, [Second Message] >>>> *message [Second Message]* 136208 [Thread-10-spout] INFO >>>> backtype.storm.daemon.task - Emitting: spout __ack_init >>>> [8818700006514275130 7403177023020018790 3] 136208 >>>> [Thread-14-__acker] INFO backtype.storm.daemon.executor - >>>> Processing received message source: spout:3, stream: __ack_init, >>>> id: {}, [8818700006514275130 7403177023020018790 3] 136209 >>>> [Thread-10-spout] INFO backtype.storm.daemon.task - Emitting: spout >>>> default [Thrid Message] 136211 [Thread-8-printerbolt] INFO >>>> backtype.storm.daemon.executor - Processing received message >>>> source: spout:3, stream: default, id: >>>> {7897209966477580404=-5223890645152565221}, [Thrid Message] >>>> *message [Thrid Message]* 136211 [Thread-10-spout] INFO >>>> backtype.storm.daemon.task - Emitting: spout __ack_init >>>> [7897209966477580404 -5223890645152565221 3] 136211 >>>> [Thread-14-__acker] INFO backtype.storm.daemon.executor - >>>> Processing received message source: spout:3, stream: __ack_init, >>>> id: {}, [7897209966477580404 -5223890645152565221 3] 136276 >>>> [Thread-12-__system] INFO backtype.storm.daemon.executor - >>>> Processing received message source: __system:-1, stream: >>>> __metrics_tick, id: {}, [60] 136277 [Thread-12-__system] INFO >>>> backtype.storm.daemon.task - Emitting: __system __metrics >>>> [#<TaskInfo >>>> backtype.storm.metric.api.IMetricsConsumer$TaskInfo@5edafb00> >>>> [#<DataPoint [__ack-count = {}]> #<DataPoint [GC/Copy = {count=26, >>>> timeMs=88}]> #<DataPoint [memory/heap = {unusedBytes=35718120, >>>> usedBytes=27065368, maxBytes=1003487232, initBytes=64761920, >>>> virtualFreeBytes=976421864, committedBytes=62783488}]> #<DataPoint >>>> [__receive = {write_pos=2, read_pos=1, capacity=1024, >>>> population=1}]> #<DataPoint [__fail-count = {}]> #<DataPoint >>>> [__execute-latency = {}]> #<DataPoint [newWorkerEvent = 0]> >>>> #<DataPoint [__emit-count = {}]> #<DataPoint [__execute-count = >>>> {}]> #<DataPoint [GC/MarkSweepCompact = {count=0, timeMs=0}]> >>>> #<DataPoint [__sendqueue = {write_pos=-1, read_pos=-1, >>>> capacity=1024, population=0}]> #<DataPoint [memory/nonHeap = >>>> {unusedBytes=71848, usedBytes=41609048, maxBytes=224395264, >>>> initBytes=24313856, virtualFreeBytes=182786216, >>>> committedBytes=41680896}]> #<DataPoint [uptimeSecs = 136.665]> >>>> #<DataPoint [__transfer = {write_pos=18, read_pos=18, >>>> capacity=1024, population=0}]> #<DataPoint [startTimeSecs = >>>> 1.417279417821E9]> #<DataPoint [__process-latency = {}]> >>>> #<DataPoint [__transfer-count = {}]>]] 136364 [Thread-14-__acker] >>>> INFO backtype.storm.daemon.executor - Processing received message >>>> source: __system:-1, stream: __metrics_tick, id: {}, [60] 136364 >>>> [Thread-14-__acker] INFO backtype.storm.daemon.task - Emitting: >>>> __acker __metrics [#<TaskInfo >>>> backtype.storm.metric.api.IMetricsConsumer$TaskInfo@7a94eda6> >>>> [#<DataPoint [__ack-count = {}]> #<DataPoint [__sendqueue = >>>> {write_pos=-1, read_pos=-1, capacity=1024, population=0}]> >>>> #<DataPoint [__receive = {write_pos=11, read_pos=10, capacity=1024, >>>> population=1}]> #<DataPoint [__process-latency = {}]> #<DataPoint >>>> [__transfer-count = {}]> #<DataPoint [__execute-latency = {}]> >>>> #<DataPoint [__fail-count = {}]> #<DataPoint [__emit-count = {}]> >>>> #<DataPoint [__execute-count = {spout:__ack_init=0}]>]] 136379 >>>> [Thread-10-spout] INFO backtype.storm.daemon.executor - Processing >>>> received message source: __system:-1, stream: __metrics_tick, id: >>>> {}, [60] 136382 [Thread-10-spout] WARN storm.kafka.KafkaUtils - No >>>> data found in Kafka Partition partition_1 136383 [Thread-10-spout] >>>> INFO backtype.storm.daemon.task - Emitting: spout __metrics >>>> [#<TaskInfo >>>> backtype.storm.metric.api.IMetricsConsumer$TaskInfo@477e6c29> >>>> [#<DataPoint [__ack-count = {}]> #<DataPoint [__sendqueue = >>>> {write_pos=17, read_pos=17, capacity=1024, population=0}]> >>>> #<DataPoint [__complete-latency = {}]> #<DataPoint [__receive = >>>> {write_pos=6, read_pos=5, capacity=1024, population=1}]> >>>> #<DataPoint [kafkaPartition = >>>> {Partition{host=rajesh-VirtualBox:9092, >>>> partition=0}/fetchAPILatencyMax=15, >>>> Partition{host=rajesh-VirtualBox:9092, >>>> partition=0}/fetchAPICallCount=30606, >>>> Partition{host=rajesh-VirtualBox:9092, >>>> partition=1}/fetchAPICallCount=30606, >>>> Partition{host=rajesh-VirtualBox:9092, >>>> partition=0}/fetchAPIMessageCount=0, >>>> Partition{host=rajesh-VirtualBox:9092, >>>> partition=1}/fetchAPILatencyMean=0.022773312422400837, >>>> Partition{host=rajesh-VirtualBox:9092, >>>> partition=1}/fetchAPIMessageCount=3, >>>> Partition{host=rajesh-VirtualBox:9092, >>>> partition=1}/fetchAPILatencyMax=16, >>>> Partition{host=rajesh-VirtualBox:9092, >>>> partition=0}/fetchAPILatencyMean=0.026694112265568844}]> >>>> #<DataPoint [__transfer-count = {default=20}]> #<DataPoint >>>> [__fail-count = {default=0}]> #<DataPoint [__emit-count = >>>> {default=20}]>]] 136950 [Thread-10-spout] INFO >>>> storm.kafka.ZkCoordinator - Task [1/1] Refreshing partition manager >>>> connections 136954 [Thread-10-spout] INFO >>>> storm.kafka.DynamicBrokersReader - Read partition info from >>>> zookeeper: >>>> GlobalPartitionInformation{partitionMap={0=rajesh-VirtualBox:9092, >>>> 1=rajesh-VirtualBox:9092}} 136954 [Thread-10-spout] INFO >>>> storm.kafka.KafkaUtils - Task [1/1] assigned >>>> [Partition{host=rajesh-VirtualBox:9092, partition=0}, >>>> Partition{host=rajesh-VirtualBox:9092, partition=1}] 136954 >>>> [Thread-10-spout] INFO storm.kafka.ZkCoordinator - Task [1/1] >>>> Deleted partition managers: [] 136954 [Thread-10-spout] INFO >>>> storm.kafka.ZkCoordinator - Task [1/1] New partition managers: [] >>>> 136954 [Thread-10-spout] INFO storm.kafka.ZkCoordinator - Task >>>> [1/1] Finished refreshing >>>> >>>> Regards, Rajesh >>> >> >