Hi, Siddharth I only used trident topology for kafka spout, since I thought I can easily to add .each function to parse the stream. I do add two types of storm-kafka packages in my pom, See this pom
<!-- Storm-Kafka compiled --> <dependency> <artifactId>storm-kafka</artifactId> <groupId>org.apache.storm</groupId> <version>0.9.2-incubating</version> <!-- <scope>*compile*</scope> --> <!-- exclude the zookeeper package from storm-Kafka --> <exclusions> <exclusion> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>storm</groupId> <artifactId>storm-kafka</artifactId> <version>0.9.0-wip16a-scala292</version> <!-- exclude the zookeeper package from storm-Kafka --> <exclusions> <exclusion> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> </exclusion> </exclusions> </dependency> When I firstly running the kafkaSpout, I got stuck for a while, after talking to developers here back and forth, I realize the version conflict is really an issue we need to pay attention, you must be make the zookeeper, storm, kafka version consistent, otherwise you will have problem, or you need to exclude it in pom. Thanks Alec On Wed, Aug 13, 2014 at 7:20 AM, siddharth ubale <siddharth.ub...@gmail.com> wrote: > hi , > > Just curious, did u face any isue with using kafka Spout if u did not use > trident? > Are u also able to implement the KafkaSpout packaged with Storm ? > I am asking cos i am unable to use the kafkaSpout(SpoutConfig) to read > from kafka topic. i am using kafka 8.1.1 and storm is 9.0.1 .... > Can you lemme know about any issue u faced?? > > i get no error while i submitting my program but only a never ending > sequence of the follwing: > 68582 [Thread-33-words] INFO backtype.storm.daemon.executor - Processing > received message source: __system:-1, stream: __metrics_tick, id: {}, [60] > 68588 [Thread-33-words] INFO backtype.storm.daemon.task - Emitting: words > __metrics [#<TaskInfo > backtype.storm.metric.api.IMetricsConsumer$TaskInfo@a92bbf9> [#<DataPoint > [__ack-count = {}]> #<DataPoint [__sendqueue = {write_pos=-1, read_pos=-1, > capacity=1024, population=0}]> #<DataPoint [__complete-latency = {}]> > #<DataPoint [__receive = {write_pos=3, read_pos=2, capacity=1024, > population=1}]> #<DataPoint [kafkaOffset = {totalLatestTime=0, > totalSpoutLag=0, totalLatestEmittedOffset=0}]> #<DataPoint > [__transfer-count = {}]> #<DataPoint [__fail-count = {}]> #<DataPoint > [__emit-count = {}]>]] > 68649 [Thread-33-words] INFO storm.kafka.ZkCoordinator - Refreshing > partition manager connections > 68649 [Thread-34-words] INFO storm.kafka.ZkCoordinator - Refreshing > partition manager connections > 68664 [Thread-33-words] INFO storm.kafka.ZkCoordinator - Deleted > partition managers: [] > 68664 [Thread-33-words] INFO storm.kafka.ZkCoordinator - New partition > managers: [] > 68664 [Thread-33-words] INFO storm.kafka.ZkCoordinator - Finished > refreshing > 68665 [Thread-34-words] INFO storm.kafka.ZkCoordinator - Deleted > partition managers: [] > 68665 [Thread-34-words] INFO storm.kafka.ZkCoordinator - New partition > managers: [] > 68665 [Thread-34-words] INFO storm.kafka.ZkCoordinator - Finished > refreshing > > Thanks, > Siddharth > > > > On Wed, Aug 13, 2014 at 5:33 AM, Sa Li <sa.in.v...@gmail.com> wrote: > >> Hi, All >> >> I am reading the messages from producer and print the "time" and >> "userhostaddress", but I am getting such warning once in a while: >> >> 184.146.220.124 >> 1403070062 >> 24.79.224.172 >> 1403070063 >> 71.199.4.138 >> 2644780 [Thread-16-spout0] WARN storm.artemis.kafka.KafkaUtils - No data >> found in Kafka Partition partition_2 >> 1403070064 >> 172.4.221.83 >> 2647191 [Thread-16-spout0] INFO >> storm.artemis.kafka.trident.ZkBrokerReader - brokers need refreshing >> because 60000ms have expired >> 2647195 [Thread-16-spout0] INFO storm.artemis.kafka.DynamicBrokersReader >> - Read partition info from zookeeper: >> GlobalPartitionInformation{partitionMap={0=10.100.70.128:9092, 1= >> 10.100.70.128:9092, 2=10.100.70.128:9092, 3=10.100.70.128:9092, 4= >> 10.100.70.128:9092}} >> 2648569 [Thread-8-$spoutcoord-spout0] INFO >> storm.artemis.kafka.trident.ZkBrokerReader - brokers need refreshing >> because 60000ms have expired >> 2648573 [Thread-8-$spoutcoord-spout0] INFO >> storm.artemis.kafka.DynamicBrokersReader - Read partition info from >> zookeeper: GlobalPartitionInformation{partitionMap={0=10.100.70.128:9092, >> 1=10.100.70.128:9092, 2=10.100.70.128:9092, 3=10.100.70.128:9092, 4= >> 10.100.70.128:9092}} >> 1403070068 >> 24.85.157.225 >> 1403070070 >> 24.114.78.75 >> 1403070070 >> 76.219.198.176 >> 1403070071 >> 142.166.228.205 >> 1403070071 >> 76.66.155.166 >> 1403070071 >> 172.56.10.86 >> 1403070071 >> >> . >> . >> . >> It says brokers need refreshing because 60000ms have expired, I didn't >> see any 60000ms being configured anywhere, I wonder what this issue is. In >> addition, I started 3 kafka brokers, and 5 partitions for the "topictest", >> >> topic: topictest partition: 0 leader: 1 replicas: 1,3,2 >> isr: 1,2,3 >> topic: topictest partition: 1 leader: 1 replicas: 2,1,3 >> isr: 1,2,3 >> topic: topictest partition: 2 leader: 1 replicas: 3,2,1 >> isr: 1,2,3 >> topic: topictest partition: 3 leader: 1 replicas: 1,2,3 >> isr: 1,2,3 >> topic: topictest partition: 4 leader: 1 replicas: 2,3,1 >> isr: 1,2,3 >> >> What I don't understand is that I couldn't see BrokerReader for the >> brokers 9093 and 9094. And also what that means "No data found in Kafka >> Partition partition_2". >> >> >> thanks >> >> Alec >> >> >