Hello,

I'm new to Storm and Kafka. I have tried Strom-Kafka integration example
program. Now I'm able to send message from Kafka and receive those messages
in storm topology.

I have observed one thing in storm topology, same messages are processing
continuously

*I have sent three messages (First Message, Second Message, Third Message
). These 3 messages processing continuously, please find below console log
file*


*Could you please help me on below query*

   -
*How to make sure that storm topology process messages one time
   successfully(Not multiple times). *
   - *What configurations I need to do *

*Below is my code :*











* BrokerHosts zk = new ZkHosts("localhost:2181");        SpoutConfig
spoutConf = new SpoutConfig(zk, "test-topic", "/kafkastorm",
"discovery");        spoutConf.scheme = new SchemeAsMultiScheme(new
StringScheme());        KafkaSpout spout = new KafkaSpout(spoutConf);
    TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", spout, 1);        builder.setBolt("printerbolt",
new PrintBolt()) .shuffleGrouping("spout");        Config config = new
Config();        config.setDebug(true);        LocalCluster cluster = new
LocalCluster();        cluster.submitTopology("kafka", config,
builder.createTopology());*

*Log file :*

25526 [Thread-10-spout] INFO  backtype.storm.daemon.task - Emitting: spout
default [First Message]
25528 [Thread-8-printerbolt] INFO  backtype.storm.daemon.executor -
Processing received message source: spout:3, stream: default, id:
{-5148901491748001310=-1334200518948214946}, [First Message]
*message [First Message]*
25538 [Thread-10-spout] INFO  backtype.storm.daemon.task - Emitting: spout
__ack_init [-5148901491748001310 -1334200518948214946 3]
25539 [Thread-14-__acker] INFO  backtype.storm.daemon.executor - Processing
received message source: spout:3, stream: __ack_init, id: {},
[-5148901491748001310 -1334200518948214946 3]
33530 [Thread-10-spout] INFO  backtype.storm.daemon.task - Emitting: spout
default [Second Message]
33531 [Thread-8-printerbolt] INFO  backtype.storm.daemon.executor -
Processing received message source: spout:3, stream: default, id:
{-8623931148894813393=4843611232629293066}, [Second Message]
*message [Second Message]*
33531 [Thread-10-spout] INFO  backtype.storm.daemon.task - Emitting: spout
__ack_init [-8623931148894813393 4843611232629293066 3]
33532 [Thread-14-__acker] INFO  backtype.storm.daemon.executor - Processing
received message source: spout:3, stream: __ack_init, id: {},
[-8623931148894813393 4843611232629293066 3]
38532 [Thread-10-spout] INFO  backtype.storm.daemon.task - Emitting: spout
default [Thrid Message]
38536 [Thread-8-printerbolt] INFO  backtype.storm.daemon.executor -
Processing received message source: spout:3, stream: default, id:
{-7749553958395790620=-1739211867328620785}, [Thrid Message]
*message [Thrid Message]*
38537 [Thread-10-spout] INFO  backtype.storm.daemon.task - Emitting: spout
__ack_init [-7749553958395790620 -1739211867328620785 3]
38537 [Thread-14-__acker] INFO  backtype.storm.daemon.executor - Processing
received message source: spout:3, stream: __ack_init, id: {},
[-7749553958395790620 -1739211867328620785 3]
46201 [Thread-10-spout] INFO  backtype.storm.daemon.executor - Processing
received message source: __system:-1, stream: __tick, id: {}, [30]
76155 [Thread-8-printerbolt] INFO  backtype.storm.daemon.executor -
Processing received message source: __system:-1, stream: __metrics_tick,
id: {}, [60]
76159 [Thread-8-printerbolt] INFO  backtype.storm.daemon.task - Emitting:
printerbolt __metrics [#<TaskInfo
backtype.storm.metric.api.IMetricsConsumer$TaskInfo@780b0702> [#<DataPoint
[__ack-count = {}]> #<DataPoint [__sendqueue = {write_pos=-1, read_pos=-1,
capacity=1024, population=0}]> #<DataPoint [__receive = {write_pos=4,
read_pos=3, capacity=1024, population=1}]> #<DataPoint [__process-latency =
{}]> #<DataPoint [__transfer-count = {}]> #<DataPoint [__execute-latency =
{}]> #<DataPoint [__fail-count = {}]> #<DataPoint [__emit-count = {}]>
#<DataPoint [__execute-count = {}]>]]
76202 [Thread-10-spout] INFO  backtype.storm.daemon.executor - Processing
received message source: __system:-1, stream: __tick, id: {}, [30]
76206 [Thread-10-spout] INFO  backtype.storm.daemon.task - Emitting: spout
default [First Message]
76206 [Thread-8-printerbolt] INFO  backtype.storm.daemon.executor -
Processing received message source: spout:3, stream: default, id:
{956790162864404846=494501721511970112}, [First Message]
*message [First Message]*
76207 [Thread-10-spout] INFO  backtype.storm.daemon.task - Emitting: spout
__ack_init [956790162864404846 494501721511970112 3]
76207 [Thread-14-__acker] INFO  backtype.storm.daemon.executor - Processing
received message source: spout:3, stream: __ack_init, id: {},
[956790162864404846 494501721511970112 3]
76207 [Thread-10-spout] INFO  backtype.storm.daemon.task - Emitting: spout
default [Second Message]
76208 [Thread-8-printerbolt] INFO  backtype.storm.daemon.executor -
Processing received message source: spout:3, stream: default, id:
{-5947127688111528870=-2474569870953878080}, [Second Message]
*message [Second Message]*
76208 [Thread-10-spout] INFO  backtype.storm.daemon.task - Emitting: spout
__ack_init [-5947127688111528870 -2474569870953878080 3]
76208 [Thread-14-__acker] INFO  backtype.storm.daemon.executor - Processing
received message source: spout:3, stream: __ack_init, id: {},
[-5947127688111528870 -2474569870953878080 3]
76208 [Thread-10-spout] INFO  backtype.storm.daemon.task - Emitting: spout
default [Thrid Message]
76209 [Thread-8-printerbolt] INFO  backtype.storm.daemon.executor -
Processing received message source: spout:3, stream: default, id:
{4790700513589938438=-2542940781190231591}, [Thrid Message]
*message [Thrid Message]*
76209 [Thread-10-spout] INFO  backtype.storm.daemon.task - Emitting: spout
__ack_init [4790700513589938438 -2542940781190231591 3]
76209 [Thread-14-__acker] INFO  backtype.storm.daemon.executor - Processing
received message source: spout:3, stream: __ack_init, id: {},
[4790700513589938438 -2542940781190231591 3]
76276 [Thread-12-__system] INFO  backtype.storm.daemon.executor -
Processing received message source: __system:-1, stream: __metrics_tick,
id: {}, [60]
76277 [Thread-12-__system] INFO  backtype.storm.daemon.task - Emitting:
__system __metrics [#<TaskInfo
backtype.storm.metric.api.IMetricsConsumer$TaskInfo@559c0881> [#<DataPoint
[__ack-count = {}]> #<DataPoint [memory/heap = {unusedBytes=39294328,
usedBytes=23489160, maxBytes=1003487232, initBytes=64761920,
virtualFreeBytes=979998072, committedBytes=62783488}]> #<DataPoint
[__receive = {write_pos=1, read_pos=0, capacity=1024, population=1}]>
#<DataPoint [__fail-count = {}]> #<DataPoint [__execute-latency = {}]>
#<DataPoint [newWorkerEvent = 1]> #<DataPoint [__emit-count = {}]>
#<DataPoint [__execute-count = {}]> #<DataPoint [__sendqueue =
{write_pos=-1, read_pos=-1, capacity=1024, population=0}]> #<DataPoint
[memory/nonHeap = {unusedBytes=171552, usedBytes=41312736,
maxBytes=224395264, initBytes=24313856, virtualFreeBytes=183082528,
committedBytes=41484288}]> #<DataPoint [uptimeSecs = 76.666]> #<DataPoint
[__transfer = {write_pos=12, read_pos=12, capacity=1024, population=0}]>
#<DataPoint [startTimeSecs = 1.417279417821E9]> #<DataPoint
[__process-latency = {}]> #<DataPoint [__transfer-count = {}]>]]
76364 [Thread-14-__acker] INFO  backtype.storm.daemon.executor - Processing
received message source: __system:-1, stream: __metrics_tick, id: {}, [60]
76365 [Thread-14-__acker] INFO  backtype.storm.daemon.task - Emitting:
__acker __metrics [#<TaskInfo
backtype.storm.metric.api.IMetricsConsumer$TaskInfo@76f2790f> [#<DataPoint
[__ack-count = {}]> #<DataPoint [__sendqueue = {write_pos=-1, read_pos=-1,
capacity=1024, population=0}]> #<DataPoint [__receive = {write_pos=7,
read_pos=6, capacity=1024, population=1}]> #<DataPoint [__process-latency =
{}]> #<DataPoint [__transfer-count = {}]> #<DataPoint [__execute-latency =
{spout:__ack_init=0.0}]> #<DataPoint [__fail-count = {}]> #<DataPoint
[__emit-count = {}]> #<DataPoint [__execute-count =
{spout:__ack_init=20}]>]]
76377 [Thread-10-spout] INFO  backtype.storm.daemon.executor - Processing
received message source: __system:-1, stream: __metrics_tick, id: {}, [60]
76381 [Thread-10-spout] WARN  storm.kafka.KafkaUtils - No data found in
Kafka Partition partition_1
76382 [Thread-10-spout] INFO  backtype.storm.daemon.task - Emitting: spout
__metrics [#<TaskInfo
backtype.storm.metric.api.IMetricsConsumer$TaskInfo@28ea04cb> [#<DataPoint
[__ack-count = {}]> #<DataPoint [__sendqueue = {write_pos=11, read_pos=11,
capacity=1024, population=0}]> #<DataPoint [__complete-latency = {}]>
#<DataPoint [__receive = {write_pos=3, read_pos=2, capacity=1024,
population=1}]> #<DataPoint [kafkaPartition =
{Partition{host=rajesh-VirtualBox:9092, partition=0}/fetchAPILatencyMax=19,
Partition{host=rajesh-VirtualBox:9092,
partition=0}/fetchAPICallCount=28451,
Partition{host=rajesh-VirtualBox:9092,
partition=1}/fetchAPICallCount=28452,
Partition{host=rajesh-VirtualBox:9092, partition=0}/fetchAPIMessageCount=0,
Partition{host=rajesh-VirtualBox:9092,
partition=1}/fetchAPILatencyMean=0.05672711935892029,
Partition{host=rajesh-VirtualBox:9092, partition=1}/fetchAPIMessageCount=6,
Partition{host=rajesh-VirtualBox:9092, partition=1}/fetchAPILatencyMax=42,
Partition{host=rajesh-VirtualBox:9092,
partition=0}/fetchAPILatencyMean=0.04481389054866261}]> #<DataPoint
[__transfer-count = {}]> #<DataPoint [__fail-count = {default=20}]>
#<DataPoint [__emit-count = {}]>]]
76943 [Thread-10-spout] INFO  storm.kafka.ZkCoordinator - Task [1/1]
Refreshing partition manager connections
76949 [Thread-10-spout] INFO  storm.kafka.DynamicBrokersReader - Read
partition info from zookeeper:
GlobalPartitionInformation{partitionMap={0=rajesh-VirtualBox:9092,
1=rajesh-VirtualBox:9092}}
76949 [Thread-10-spout] INFO  storm.kafka.KafkaUtils - Task [1/1] assigned
[Partition{host=rajesh-VirtualBox:9092, partition=0},
Partition{host=rajesh-VirtualBox:9092, partition=1}]
76949 [Thread-10-spout] INFO  storm.kafka.ZkCoordinator - Task [1/1]
Deleted partition managers: []
76949 [Thread-10-spout] INFO  storm.kafka.ZkCoordinator - Task [1/1] New
partition managers: []
76949 [Thread-10-spout] INFO  storm.kafka.ZkCoordinator - Task [1/1]
Finished refreshing
106203 [Thread-10-spout] INFO  backtype.storm.daemon.executor - Processing
received message source: __system:-1, stream: __tick, id: {}, [30]
136154 [Thread-8-printerbolt] INFO  backtype.storm.daemon.executor -
Processing received message source: __system:-1, stream: __metrics_tick,
id: {}, [60]
136155 [Thread-8-printerbolt] INFO  backtype.storm.daemon.task - Emitting:
printerbolt __metrics [#<TaskInfo
backtype.storm.metric.api.IMetricsConsumer$TaskInfo@6ba3a9e9> [#<DataPoint
[__ack-count = {}]> #<DataPoint [__sendqueue = {write_pos=-1, read_pos=-1,
capacity=1024, population=0}]> #<DataPoint [__receive = {write_pos=8,
read_pos=7, capacity=1024, population=1}]> #<DataPoint [__process-latency =
{}]> #<DataPoint [__transfer-count = {}]> #<DataPoint [__execute-latency =
{}]> #<DataPoint [__fail-count = {}]> #<DataPoint [__emit-count = {}]>
#<DataPoint [__execute-count = {}]>]]
136204 [Thread-10-spout] INFO  backtype.storm.daemon.executor - Processing
received message source: __system:-1, stream: __tick, id: {}, [30]
136206 [Thread-10-spout] INFO  backtype.storm.daemon.task - Emitting: spout
default [First Message]
136206 [Thread-8-printerbolt] INFO  backtype.storm.daemon.executor -
Processing received message source: spout:3, stream: default, id:
{3336041025082572443=4848943651836321291}, [First Message]
*message [First Message]*
136206 [Thread-10-spout] INFO  backtype.storm.daemon.task - Emitting: spout
__ack_init [3336041025082572443 4848943651836321291 3]
136207 [Thread-14-__acker] INFO  backtype.storm.daemon.executor -
Processing received message source: spout:3, stream: __ack_init, id: {},
[3336041025082572443 4848943651836321291 3]
136207 [Thread-10-spout] INFO  backtype.storm.daemon.task - Emitting: spout
default [Second Message]
136208 [Thread-8-printerbolt] INFO  backtype.storm.daemon.executor -
Processing received message source: spout:3, stream: default, id:
{8818700006514275130=7403177023020018790}, [Second Message]
*message [Second Message]*
136208 [Thread-10-spout] INFO  backtype.storm.daemon.task - Emitting: spout
__ack_init [8818700006514275130 7403177023020018790 3]
136208 [Thread-14-__acker] INFO  backtype.storm.daemon.executor -
Processing received message source: spout:3, stream: __ack_init, id: {},
[8818700006514275130 7403177023020018790 3]
136209 [Thread-10-spout] INFO  backtype.storm.daemon.task - Emitting: spout
default [Thrid Message]
136211 [Thread-8-printerbolt] INFO  backtype.storm.daemon.executor -
Processing received message source: spout:3, stream: default, id:
{7897209966477580404=-5223890645152565221}, [Thrid Message]
*message [Thrid Message]*
136211 [Thread-10-spout] INFO  backtype.storm.daemon.task - Emitting: spout
__ack_init [7897209966477580404 -5223890645152565221 3]
136211 [Thread-14-__acker] INFO  backtype.storm.daemon.executor -
Processing received message source: spout:3, stream: __ack_init, id: {},
[7897209966477580404 -5223890645152565221 3]
136276 [Thread-12-__system] INFO  backtype.storm.daemon.executor -
Processing received message source: __system:-1, stream: __metrics_tick,
id: {}, [60]
136277 [Thread-12-__system] INFO  backtype.storm.daemon.task - Emitting:
__system __metrics [#<TaskInfo
backtype.storm.metric.api.IMetricsConsumer$TaskInfo@5edafb00> [#<DataPoint
[__ack-count = {}]> #<DataPoint [GC/Copy = {count=26, timeMs=88}]>
#<DataPoint [memory/heap = {unusedBytes=35718120, usedBytes=27065368,
maxBytes=1003487232, initBytes=64761920, virtualFreeBytes=976421864,
committedBytes=62783488}]> #<DataPoint [__receive = {write_pos=2,
read_pos=1, capacity=1024, population=1}]> #<DataPoint [__fail-count = {}]>
#<DataPoint [__execute-latency = {}]> #<DataPoint [newWorkerEvent = 0]>
#<DataPoint [__emit-count = {}]> #<DataPoint [__execute-count = {}]>
#<DataPoint [GC/MarkSweepCompact = {count=0, timeMs=0}]> #<DataPoint
[__sendqueue = {write_pos=-1, read_pos=-1, capacity=1024, population=0}]>
#<DataPoint [memory/nonHeap = {unusedBytes=71848, usedBytes=41609048,
maxBytes=224395264, initBytes=24313856, virtualFreeBytes=182786216,
committedBytes=41680896}]> #<DataPoint [uptimeSecs = 136.665]> #<DataPoint
[__transfer = {write_pos=18, read_pos=18, capacity=1024, population=0}]>
#<DataPoint [startTimeSecs = 1.417279417821E9]> #<DataPoint
[__process-latency = {}]> #<DataPoint [__transfer-count = {}]>]]
136364 [Thread-14-__acker] INFO  backtype.storm.daemon.executor -
Processing received message source: __system:-1, stream: __metrics_tick,
id: {}, [60]
136364 [Thread-14-__acker] INFO  backtype.storm.daemon.task - Emitting:
__acker __metrics [#<TaskInfo
backtype.storm.metric.api.IMetricsConsumer$TaskInfo@7a94eda6> [#<DataPoint
[__ack-count = {}]> #<DataPoint [__sendqueue = {write_pos=-1, read_pos=-1,
capacity=1024, population=0}]> #<DataPoint [__receive = {write_pos=11,
read_pos=10, capacity=1024, population=1}]> #<DataPoint [__process-latency
= {}]> #<DataPoint [__transfer-count = {}]> #<DataPoint [__execute-latency
= {}]> #<DataPoint [__fail-count = {}]> #<DataPoint [__emit-count = {}]>
#<DataPoint [__execute-count = {spout:__ack_init=0}]>]]
136379 [Thread-10-spout] INFO  backtype.storm.daemon.executor - Processing
received message source: __system:-1, stream: __metrics_tick, id: {}, [60]
136382 [Thread-10-spout] WARN  storm.kafka.KafkaUtils - No data found in
Kafka Partition partition_1
136383 [Thread-10-spout] INFO  backtype.storm.daemon.task - Emitting: spout
__metrics [#<TaskInfo
backtype.storm.metric.api.IMetricsConsumer$TaskInfo@477e6c29> [#<DataPoint
[__ack-count = {}]> #<DataPoint [__sendqueue = {write_pos=17, read_pos=17,
capacity=1024, population=0}]> #<DataPoint [__complete-latency = {}]>
#<DataPoint [__receive = {write_pos=6, read_pos=5, capacity=1024,
population=1}]> #<DataPoint [kafkaPartition =
{Partition{host=rajesh-VirtualBox:9092, partition=0}/fetchAPILatencyMax=15,
Partition{host=rajesh-VirtualBox:9092,
partition=0}/fetchAPICallCount=30606,
Partition{host=rajesh-VirtualBox:9092,
partition=1}/fetchAPICallCount=30606,
Partition{host=rajesh-VirtualBox:9092, partition=0}/fetchAPIMessageCount=0,
Partition{host=rajesh-VirtualBox:9092,
partition=1}/fetchAPILatencyMean=0.022773312422400837,
Partition{host=rajesh-VirtualBox:9092, partition=1}/fetchAPIMessageCount=3,
Partition{host=rajesh-VirtualBox:9092, partition=1}/fetchAPILatencyMax=16,
Partition{host=rajesh-VirtualBox:9092,
partition=0}/fetchAPILatencyMean=0.026694112265568844}]> #<DataPoint
[__transfer-count = {default=20}]> #<DataPoint [__fail-count =
{default=0}]> #<DataPoint [__emit-count = {default=20}]>]]
136950 [Thread-10-spout] INFO  storm.kafka.ZkCoordinator - Task [1/1]
Refreshing partition manager connections
136954 [Thread-10-spout] INFO  storm.kafka.DynamicBrokersReader - Read
partition info from zookeeper:
GlobalPartitionInformation{partitionMap={0=rajesh-VirtualBox:9092,
1=rajesh-VirtualBox:9092}}
136954 [Thread-10-spout] INFO  storm.kafka.KafkaUtils - Task [1/1] assigned
[Partition{host=rajesh-VirtualBox:9092, partition=0},
Partition{host=rajesh-VirtualBox:9092, partition=1}]
136954 [Thread-10-spout] INFO  storm.kafka.ZkCoordinator - Task [1/1]
Deleted partition managers: []
136954 [Thread-10-spout] INFO  storm.kafka.ZkCoordinator - Task [1/1] New
partition managers: []
136954 [Thread-10-spout] INFO  storm.kafka.ZkCoordinator - Task [1/1]
Finished refreshing

Regards,
Rajesh

Reply via email to