Thank you very much Harsha Regards, Rajesh
On Tue, Dec 2, 2014 at 8:50 AM, Harsha <st...@harsha.io> wrote: > Ok from the earlier logs it looks like your tuples are being timed out > and getting replayed. > In your PrintBolt.execute do collector.ack(tuple) > public class PrintBolt extends BaseRichBolt > { > private static final long serialVersionUID = 1L; > private OutputCollector collector; > > public void execute(Tuple tuple) > { > System.out.println("message " + tuple.getValues()); > collector.ack(tuple); > } > public void prepare(Map arg0, TopologyContext arg1, OutputCollector > arg2) { > this.collector = arg2; > } > public void declareOutputFields(OutputFieldsDeclarer arg0) { > } > } > > > On Mon, Dec 1, 2014, at 07:10 PM, Madabhattula Rajesh Kumar wrote: > > Thank you Harsha for your response. > > I'm just printing the messages in printer bolt. > > Please find below printer blot code > > *public class PrintBolt extends BaseRichBolt * > *{* > * private static final long serialVersionUID = 1L;* > * public void execute(Tuple tuple) * > * {* > * System.out.println("message " + tuple.getValues());* > * }* > * public void prepare(Map arg0, TopologyContext arg1, OutputCollector > arg2) {* > * }* > * public void declareOutputFields(OutputFieldsDeclarer arg0) {* > * }* > *}* > > Regards, > Rajesh > > > On Tue, Dec 2, 2014 at 8:19 AM, Harsha <st...@harsha.io> wrote: > > > Does your printer bolt ack the messages it received from KafkaSpout. > > > On Mon, Dec 1, 2014, at 06:38 PM, Madabhattula Rajesh Kumar wrote: > > Hello, > > Could any one help me on above mail query? > > Regards, > Rajesh > > On Sat, Nov 29, 2014 at 10:30 PM, Madabhattula Rajesh Kumar < > mrajaf...@gmail.com> wrote: > > Hello, > > I'm new to Storm and Kafka. I have tried Strom-Kafka integration example > program. Now I'm able to send message from Kafka and receive those messages > in storm topology. > > I have observed one thing in storm topology, same messages are > processing continuously > > *I have sent three messages (First Message, Second Message, Third > Message ). These 3 messages processing continuously, please find below > console log file* > > *Could you please help me on below query* > > - *How to make sure that storm topology process messages one time > successfully(Not multiple times). * > - *What configurations I need to do * > > *Below is my code :* > > * BrokerHosts zk = new ZkHosts("localhost:2181");* > * SpoutConfig spoutConf = new SpoutConfig(zk, "test-topic", > "/kafkastorm", "discovery");* > * spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());* > * KafkaSpout spout = new KafkaSpout(spoutConf);* > * TopologyBuilder builder = new TopologyBuilder();* > * builder.setSpout("spout", spout, 1);* > * builder.setBolt("printerbolt", new PrintBolt()) > .shuffleGrouping("spout");* > * Config config = new Config();* > * config.setDebug(true);* > * LocalCluster cluster = new LocalCluster();* > * cluster.submitTopology("kafka", config, > builder.createTopology());* > > *Log file :* > > 25526 [Thread-10-spout] INFO backtype.storm.daemon.task - Emitting: spout > default [First Message] > 25528 [Thread-8-printerbolt] INFO backtype.storm.daemon.executor - > Processing received message source: spout:3, stream: default, id: > {-5148901491748001310=-1334200518948214946}, [First Message] > *message [First Message]* > 25538 [Thread-10-spout] INFO backtype.storm.daemon.task - Emitting: spout > __ack_init [-5148901491748001310 -1334200518948214946 3] > 25539 [Thread-14-__acker] INFO backtype.storm.daemon.executor - > Processing received message source: spout:3, stream: __ack_init, id: {}, > [-5148901491748001310 -1334200518948214946 3] > 33530 [Thread-10-spout] INFO backtype.storm.daemon.task - Emitting: spout > default [Second Message] > 33531 [Thread-8-printerbolt] INFO backtype.storm.daemon.executor - > Processing received message source: spout:3, stream: default, id: > {-8623931148894813393=4843611232629293066}, [Second Message] > *message [Second Message]* > 33531 [Thread-10-spout] INFO backtype.storm.daemon.task - Emitting: spout > __ack_init [-8623931148894813393 4843611232629293066 3] > 33532 [Thread-14-__acker] INFO backtype.storm.daemon.executor - > Processing received message source: spout:3, stream: __ack_init, id: {}, > [-8623931148894813393 4843611232629293066 3] > 38532 [Thread-10-spout] INFO backtype.storm.daemon.task - Emitting: spout > default [Thrid Message] > 38536 [Thread-8-printerbolt] INFO backtype.storm.daemon.executor - > Processing received message source: spout:3, stream: default, id: > {-7749553958395790620=-1739211867328620785}, [Thrid Message] > *message [Thrid Message]* > 38537 [Thread-10-spout] INFO backtype.storm.daemon.task - Emitting: spout > __ack_init [-7749553958395790620 -1739211867328620785 3] > 38537 [Thread-14-__acker] INFO backtype.storm.daemon.executor - > Processing received message source: spout:3, stream: __ack_init, id: {}, > [-7749553958395790620 -1739211867328620785 3] > 46201 [Thread-10-spout] INFO backtype.storm.daemon.executor - Processing > received message source: __system:-1, stream: __tick, id: {}, [30] > 76155 [Thread-8-printerbolt] INFO backtype.storm.daemon.executor - > Processing received message source: __system:-1, stream: __metrics_tick, > id: {}, [60] > 76159 [Thread-8-printerbolt] INFO backtype.storm.daemon.task - Emitting: > printerbolt __metrics [#<TaskInfo > backtype.storm.metric.api.IMetricsConsumer$TaskInfo@780b0702> > [#<DataPoint [__ack-count = {}]> #<DataPoint [__sendqueue = {write_pos=-1, > read_pos=-1, capacity=1024, population=0}]> #<DataPoint [__receive = > {write_pos=4, read_pos=3, capacity=1024, population=1}]> #<DataPoint > [__process-latency = {}]> #<DataPoint [__transfer-count = {}]> #<DataPoint > [__execute-latency = {}]> #<DataPoint [__fail-count = {}]> #<DataPoint > [__emit-count = {}]> #<DataPoint [__execute-count = {}]>]] > 76202 [Thread-10-spout] INFO backtype.storm.daemon.executor - Processing > received message source: __system:-1, stream: __tick, id: {}, [30] > 76206 [Thread-10-spout] INFO backtype.storm.daemon.task - Emitting: spout > default [First Message] > 76206 [Thread-8-printerbolt] INFO backtype.storm.daemon.executor - > Processing received message source: spout:3, stream: default, id: > {956790162864404846=494501721511970112}, [First Message] > *message [First Message]* > 76207 [Thread-10-spout] INFO backtype.storm.daemon.task - Emitting: spout > __ack_init [956790162864404846 494501721511970112 3] > 76207 [Thread-14-__acker] INFO backtype.storm.daemon.executor - > Processing received message source: spout:3, stream: __ack_init, id: {}, > [956790162864404846 494501721511970112 3] > 76207 [Thread-10-spout] INFO backtype.storm.daemon.task - Emitting: spout > default [Second Message] > 76208 [Thread-8-printerbolt] INFO backtype.storm.daemon.executor - > Processing received message source: spout:3, stream: default, id: > {-5947127688111528870=-2474569870953878080}, [Second Message] > *message [Second Message]* > 76208 [Thread-10-spout] INFO backtype.storm.daemon.task - Emitting: spout > __ack_init [-5947127688111528870 -2474569870953878080 3] > 76208 [Thread-14-__acker] INFO backtype.storm.daemon.executor - > Processing received message source: spout:3, stream: __ack_init, id: {}, > [-5947127688111528870 -2474569870953878080 3] > 76208 [Thread-10-spout] INFO backtype.storm.daemon.task - Emitting: spout > default [Thrid Message] > 76209 [Thread-8-printerbolt] INFO backtype.storm.daemon.executor - > Processing received message source: spout:3, stream: default, id: > {4790700513589938438=-2542940781190231591}, [Thrid Message] > *message [Thrid Message]* > 76209 [Thread-10-spout] INFO backtype.storm.daemon.task - Emitting: spout > __ack_init [4790700513589938438 -2542940781190231591 3] > 76209 [Thread-14-__acker] INFO backtype.storm.daemon.executor - > Processing received message source: spout:3, stream: __ack_init, id: {}, > [4790700513589938438 -2542940781190231591 3] > 76276 [Thread-12-__system] INFO backtype.storm.daemon.executor - > Processing received message source: __system:-1, stream: __metrics_tick, > id: {}, [60] > 76277 [Thread-12-__system] INFO backtype.storm.daemon.task - Emitting: > __system __metrics [#<TaskInfo > backtype.storm.metric.api.IMetricsConsumer$TaskInfo@559c0881> > [#<DataPoint [__ack-count = {}]> #<DataPoint [memory/heap = > {unusedBytes=39294328, usedBytes=23489160, maxBytes=1003487232, > initBytes=64761920, virtualFreeBytes=979998072, committedBytes=62783488}]> > #<DataPoint [__receive = {write_pos=1, read_pos=0, capacity=1024, > population=1}]> #<DataPoint [__fail-count = {}]> #<DataPoint > [__execute-latency = {}]> #<DataPoint [newWorkerEvent = 1]> #<DataPoint > [__emit-count = {}]> #<DataPoint [__execute-count = {}]> #<DataPoint > [__sendqueue = {write_pos=-1, read_pos=-1, capacity=1024, population=0}]> > #<DataPoint [memory/nonHeap = {unusedBytes=171552, usedBytes=41312736, > maxBytes=224395264, initBytes=24313856, virtualFreeBytes=183082528, > committedBytes=41484288}]> #<DataPoint [uptimeSecs = 76.666]> #<DataPoint > [__transfer = {write_pos=12, read_pos=12, capacity=1024, population=0}]> > #<DataPoint [startTimeSecs = 1.417279417821E9]> #<DataPoint > [__process-latency = {}]> #<DataPoint [__transfer-count = {}]>]] > 76364 [Thread-14-__acker] INFO backtype.storm.daemon.executor - > Processing received message source: __system:-1, stream: __metrics_tick, > id: {}, [60] > 76365 [Thread-14-__acker] INFO backtype.storm.daemon.task - Emitting: > __acker __metrics [#<TaskInfo > backtype.storm.metric.api.IMetricsConsumer$TaskInfo@76f2790f> > [#<DataPoint [__ack-count = {}]> #<DataPoint [__sendqueue = {write_pos=-1, > read_pos=-1, capacity=1024, population=0}]> #<DataPoint [__receive = > {write_pos=7, read_pos=6, capacity=1024, population=1}]> #<DataPoint > [__process-latency = {}]> #<DataPoint [__transfer-count = {}]> #<DataPoint > [__execute-latency = {spout:__ack_init=0.0}]> #<DataPoint [__fail-count = > {}]> #<DataPoint [__emit-count = {}]> #<DataPoint [__execute-count = > {spout:__ack_init=20}]>]] > 76377 [Thread-10-spout] INFO backtype.storm.daemon.executor - Processing > received message source: __system:-1, stream: __metrics_tick, id: {}, [60] > 76381 [Thread-10-spout] WARN storm.kafka.KafkaUtils - No data found in > Kafka Partition partition_1 > 76382 [Thread-10-spout] INFO backtype.storm.daemon.task - Emitting: spout > __metrics [#<TaskInfo > backtype.storm.metric.api.IMetricsConsumer$TaskInfo@28ea04cb> > [#<DataPoint [__ack-count = {}]> #<DataPoint [__sendqueue = {write_pos=11, > read_pos=11, capacity=1024, population=0}]> #<DataPoint [__complete-latency > = {}]> #<DataPoint [__receive = {write_pos=3, read_pos=2, capacity=1024, > population=1}]> #<DataPoint [kafkaPartition = > {Partition{host=rajesh-VirtualBox:9092, partition=0}/fetchAPILatencyMax=19, > Partition{host=rajesh-VirtualBox:9092, > partition=0}/fetchAPICallCount=28451, > Partition{host=rajesh-VirtualBox:9092, > partition=1}/fetchAPICallCount=28452, > Partition{host=rajesh-VirtualBox:9092, partition=0}/fetchAPIMessageCount=0, > Partition{host=rajesh-VirtualBox:9092, > partition=1}/fetchAPILatencyMean=0.05672711935892029, > Partition{host=rajesh-VirtualBox:9092, partition=1}/fetchAPIMessageCount=6, > Partition{host=rajesh-VirtualBox:9092, partition=1}/fetchAPILatencyMax=42, > Partition{host=rajesh-VirtualBox:9092, > partition=0}/fetchAPILatencyMean=0.04481389054866261}]> #<DataPoint > [__transfer-count = {}]> #<DataPoint [__fail-count = {default=20}]> > #<DataPoint [__emit-count = {}]>]] > 76943 [Thread-10-spout] INFO storm.kafka.ZkCoordinator - Task [1/1] > Refreshing partition manager connections > 76949 [Thread-10-spout] INFO storm.kafka.DynamicBrokersReader - Read > partition info from zookeeper: > GlobalPartitionInformation{partitionMap={0=rajesh-VirtualBox:9092, > 1=rajesh-VirtualBox:9092}} > 76949 [Thread-10-spout] INFO storm.kafka.KafkaUtils - Task [1/1] assigned > [Partition{host=rajesh-VirtualBox:9092, partition=0}, > Partition{host=rajesh-VirtualBox:9092, partition=1}] > 76949 [Thread-10-spout] INFO storm.kafka.ZkCoordinator - Task [1/1] > Deleted partition managers: [] > 76949 [Thread-10-spout] INFO storm.kafka.ZkCoordinator - Task [1/1] New > partition managers: [] > 76949 [Thread-10-spout] INFO storm.kafka.ZkCoordinator - Task [1/1] > Finished refreshing > 106203 [Thread-10-spout] INFO backtype.storm.daemon.executor - Processing > received message source: __system:-1, stream: __tick, id: {}, [30] > 136154 [Thread-8-printerbolt] INFO backtype.storm.daemon.executor - > Processing received message source: __system:-1, stream: __metrics_tick, > id: {}, [60] > 136155 [Thread-8-printerbolt] INFO backtype.storm.daemon.task - Emitting: > printerbolt __metrics [#<TaskInfo > backtype.storm.metric.api.IMetricsConsumer$TaskInfo@6ba3a9e9> > [#<DataPoint [__ack-count = {}]> #<DataPoint [__sendqueue = {write_pos=-1, > read_pos=-1, capacity=1024, population=0}]> #<DataPoint [__receive = > {write_pos=8, read_pos=7, capacity=1024, population=1}]> #<DataPoint > [__process-latency = {}]> #<DataPoint [__transfer-count = {}]> #<DataPoint > [__execute-latency = {}]> #<DataPoint [__fail-count = {}]> #<DataPoint > [__emit-count = {}]> #<DataPoint [__execute-count = {}]>]] > 136204 [Thread-10-spout] INFO backtype.storm.daemon.executor - Processing > received message source: __system:-1, stream: __tick, id: {}, [30] > 136206 [Thread-10-spout] INFO backtype.storm.daemon.task - Emitting: > spout default [First Message] > 136206 [Thread-8-printerbolt] INFO backtype.storm.daemon.executor - > Processing received message source: spout:3, stream: default, id: > {3336041025082572443=4848943651836321291}, [First Message] > *message [First Message]* > 136206 [Thread-10-spout] INFO backtype.storm.daemon.task - Emitting: > spout __ack_init [3336041025082572443 4848943651836321291 3] > 136207 [Thread-14-__acker] INFO backtype.storm.daemon.executor - > Processing received message source: spout:3, stream: __ack_init, id: {}, > [3336041025082572443 4848943651836321291 3] > 136207 [Thread-10-spout] INFO backtype.storm.daemon.task - Emitting: > spout default [Second Message] > 136208 [Thread-8-printerbolt] INFO backtype.storm.daemon.executor - > Processing received message source: spout:3, stream: default, id: > {8818700006514275130=7403177023020018790}, [Second Message] > *message [Second Message]* > 136208 [Thread-10-spout] INFO backtype.storm.daemon.task - Emitting: > spout __ack_init [8818700006514275130 7403177023020018790 3] > 136208 [Thread-14-__acker] INFO backtype.storm.daemon.executor - > Processing received message source: spout:3, stream: __ack_init, id: {}, > [8818700006514275130 7403177023020018790 3] > 136209 [Thread-10-spout] INFO backtype.storm.daemon.task - Emitting: > spout default [Thrid Message] > 136211 [Thread-8-printerbolt] INFO backtype.storm.daemon.executor - > Processing received message source: spout:3, stream: default, id: > {7897209966477580404=-5223890645152565221}, [Thrid Message] > *message [Thrid Message]* > 136211 [Thread-10-spout] INFO backtype.storm.daemon.task - Emitting: > spout __ack_init [7897209966477580404 -5223890645152565221 3] > 136211 [Thread-14-__acker] INFO backtype.storm.daemon.executor - > Processing received message source: spout:3, stream: __ack_init, id: {}, > [7897209966477580404 -5223890645152565221 3] > 136276 [Thread-12-__system] INFO backtype.storm.daemon.executor - > Processing received message source: __system:-1, stream: __metrics_tick, > id: {}, [60] > 136277 [Thread-12-__system] INFO backtype.storm.daemon.task - Emitting: > __system __metrics [#<TaskInfo > backtype.storm.metric.api.IMetricsConsumer$TaskInfo@5edafb00> > [#<DataPoint [__ack-count = {}]> #<DataPoint [GC/Copy = {count=26, > timeMs=88}]> #<DataPoint [memory/heap = {unusedBytes=35718120, > usedBytes=27065368, maxBytes=1003487232, initBytes=64761920, > virtualFreeBytes=976421864, committedBytes=62783488}]> #<DataPoint > [__receive = {write_pos=2, read_pos=1, capacity=1024, population=1}]> > #<DataPoint [__fail-count = {}]> #<DataPoint [__execute-latency = {}]> > #<DataPoint [newWorkerEvent = 0]> #<DataPoint [__emit-count = {}]> > #<DataPoint [__execute-count = {}]> #<DataPoint [GC/MarkSweepCompact = > {count=0, timeMs=0}]> #<DataPoint [__sendqueue = {write_pos=-1, > read_pos=-1, capacity=1024, population=0}]> #<DataPoint [memory/nonHeap = > {unusedBytes=71848, usedBytes=41609048, maxBytes=224395264, > initBytes=24313856, virtualFreeBytes=182786216, committedBytes=41680896}]> > #<DataPoint [uptimeSecs = 136.665]> #<DataPoint [__transfer = > {write_pos=18, read_pos=18, capacity=1024, population=0}]> #<DataPoint > [startTimeSecs = 1.417279417821E9]> #<DataPoint [__process-latency = {}]> > #<DataPoint [__transfer-count = {}]>]] > 136364 [Thread-14-__acker] INFO backtype.storm.daemon.executor - > Processing received message source: __system:-1, stream: __metrics_tick, > id: {}, [60] > 136364 [Thread-14-__acker] INFO backtype.storm.daemon.task - Emitting: > __acker __metrics [#<TaskInfo > backtype.storm.metric.api.IMetricsConsumer$TaskInfo@7a94eda6> > [#<DataPoint [__ack-count = {}]> #<DataPoint [__sendqueue = {write_pos=-1, > read_pos=-1, capacity=1024, population=0}]> #<DataPoint [__receive = > {write_pos=11, read_pos=10, capacity=1024, population=1}]> #<DataPoint > [__process-latency = {}]> #<DataPoint [__transfer-count = {}]> #<DataPoint > [__execute-latency = {}]> #<DataPoint [__fail-count = {}]> #<DataPoint > [__emit-count = {}]> #<DataPoint [__execute-count = {spout:__ack_init=0}]>]] > 136379 [Thread-10-spout] INFO backtype.storm.daemon.executor - Processing > received message source: __system:-1, stream: __metrics_tick, id: {}, [60] > 136382 [Thread-10-spout] WARN storm.kafka.KafkaUtils - No data found in > Kafka Partition partition_1 > 136383 [Thread-10-spout] INFO backtype.storm.daemon.task - Emitting: > spout __metrics [#<TaskInfo > backtype.storm.metric.api.IMetricsConsumer$TaskInfo@477e6c29> > [#<DataPoint [__ack-count = {}]> #<DataPoint [__sendqueue = {write_pos=17, > read_pos=17, capacity=1024, population=0}]> #<DataPoint [__complete-latency > = {}]> #<DataPoint [__receive = {write_pos=6, read_pos=5, capacity=1024, > population=1}]> #<DataPoint [kafkaPartition = > {Partition{host=rajesh-VirtualBox:9092, partition=0}/fetchAPILatencyMax=15, > Partition{host=rajesh-VirtualBox:9092, > partition=0}/fetchAPICallCount=30606, > Partition{host=rajesh-VirtualBox:9092, > partition=1}/fetchAPICallCount=30606, > Partition{host=rajesh-VirtualBox:9092, partition=0}/fetchAPIMessageCount=0, > Partition{host=rajesh-VirtualBox:9092, > partition=1}/fetchAPILatencyMean=0.022773312422400837, > Partition{host=rajesh-VirtualBox:9092, partition=1}/fetchAPIMessageCount=3, > Partition{host=rajesh-VirtualBox:9092, partition=1}/fetchAPILatencyMax=16, > Partition{host=rajesh-VirtualBox:9092, > partition=0}/fetchAPILatencyMean=0.026694112265568844}]> #<DataPoint > [__transfer-count = {default=20}]> #<DataPoint [__fail-count = > {default=0}]> #<DataPoint [__emit-count = {default=20}]>]] > 136950 [Thread-10-spout] INFO storm.kafka.ZkCoordinator - Task [1/1] > Refreshing partition manager connections > 136954 [Thread-10-spout] INFO storm.kafka.DynamicBrokersReader - Read > partition info from zookeeper: > GlobalPartitionInformation{partitionMap={0=rajesh-VirtualBox:9092, > 1=rajesh-VirtualBox:9092}} > 136954 [Thread-10-spout] INFO storm.kafka.KafkaUtils - Task [1/1] > assigned [Partition{host=rajesh-VirtualBox:9092, partition=0}, > Partition{host=rajesh-VirtualBox:9092, partition=1}] > 136954 [Thread-10-spout] INFO storm.kafka.ZkCoordinator - Task [1/1] > Deleted partition managers: [] > 136954 [Thread-10-spout] INFO storm.kafka.ZkCoordinator - Task [1/1] New > partition managers: [] > 136954 [Thread-10-spout] INFO storm.kafka.ZkCoordinator - Task [1/1] > Finished refreshing > > Regards, > Rajesh > > > > > > > > >