Hi, Siddharth

I only used trident topology for kafka spout, since I thought I can easily
to add .each function to parse the stream. I do add two types of
storm-kafka packages in my pom, See this pom

     <!-- Storm-Kafka compiled -->
    <dependency>
            <artifactId>storm-kafka</artifactId>
            <groupId>org.apache.storm</groupId>
            <version>0.9.2-incubating</version>
  <!--
            <scope>*compile*</scope>
  -->
                   <!-- exclude the zookeeper package from storm-Kafka -->
                <exclusions>
                   <exclusion>
                       <groupId>org.apache.zookeeper</groupId>
                       <artifactId>zookeeper</artifactId>
                   </exclusion>
                </exclusions>
    </dependency>

     <dependency>
            <groupId>storm</groupId>
            <artifactId>storm-kafka</artifactId>
            <version>0.9.0-wip16a-scala292</version>
                        <!-- exclude the zookeeper package from storm-Kafka
-->
                <exclusions>
                   <exclusion>
                       <groupId>org.apache.zookeeper</groupId>
                       <artifactId>zookeeper</artifactId>
                   </exclusion>
                </exclusions>
     </dependency>

When I firstly running the kafkaSpout, I got stuck for a while, after
talking to developers here back and forth, I realize the version conflict
is really an issue we need to pay attention, you must be make the
zookeeper, storm, kafka version consistent, otherwise you will have
problem, or you need to exclude it in pom.

Thanks

Alec


On Wed, Aug 13, 2014 at 7:20 AM, siddharth ubale <siddharth.ub...@gmail.com>
wrote:

> hi ,
>
> Just curious, did u face any isue with using kafka Spout if u did not use
> trident?
> Are u also able to implement the KafkaSpout packaged with Storm ?
> I am asking cos i am unable to use the kafkaSpout(SpoutConfig) to read
> from kafka topic. i am using kafka 8.1.1 and storm is 9.0.1 ....
> Can you lemme know about any issue u faced??
>
> i get no error while i submitting my program but only a never ending
> sequence of the follwing:
> 68582 [Thread-33-words] INFO  backtype.storm.daemon.executor - Processing
> received message source: __system:-1, stream: __metrics_tick, id: {}, [60]
> 68588 [Thread-33-words] INFO  backtype.storm.daemon.task - Emitting: words
> __metrics [#<TaskInfo
> backtype.storm.metric.api.IMetricsConsumer$TaskInfo@a92bbf9> [#<DataPoint
> [__ack-count = {}]> #<DataPoint [__sendqueue = {write_pos=-1, read_pos=-1,
> capacity=1024, population=0}]> #<DataPoint [__complete-latency = {}]>
> #<DataPoint [__receive = {write_pos=3, read_pos=2, capacity=1024,
> population=1}]> #<DataPoint [kafkaOffset = {totalLatestTime=0,
> totalSpoutLag=0, totalLatestEmittedOffset=0}]> #<DataPoint
> [__transfer-count = {}]> #<DataPoint [__fail-count = {}]> #<DataPoint
> [__emit-count = {}]>]]
> 68649 [Thread-33-words] INFO  storm.kafka.ZkCoordinator - Refreshing
> partition manager connections
> 68649 [Thread-34-words] INFO  storm.kafka.ZkCoordinator - Refreshing
> partition manager connections
> 68664 [Thread-33-words] INFO  storm.kafka.ZkCoordinator - Deleted
> partition managers: []
> 68664 [Thread-33-words] INFO  storm.kafka.ZkCoordinator - New partition
> managers: []
> 68664 [Thread-33-words] INFO  storm.kafka.ZkCoordinator - Finished
> refreshing
> 68665 [Thread-34-words] INFO  storm.kafka.ZkCoordinator - Deleted
> partition managers: []
> 68665 [Thread-34-words] INFO  storm.kafka.ZkCoordinator - New partition
> managers: []
> 68665 [Thread-34-words] INFO  storm.kafka.ZkCoordinator - Finished
> refreshing
>
> Thanks,
> Siddharth
>
>
>
> On Wed, Aug 13, 2014 at 5:33 AM, Sa Li <sa.in.v...@gmail.com> wrote:
>
>> Hi, All
>>
>> I am reading the messages from producer and print the "time" and
>> "userhostaddress", but I am getting such warning once in a while:
>>
>> 184.146.220.124
>> 1403070062
>> 24.79.224.172
>> 1403070063
>> 71.199.4.138
>> 2644780 [Thread-16-spout0] WARN  storm.artemis.kafka.KafkaUtils - No data
>> found in Kafka Partition partition_2
>> 1403070064
>> 172.4.221.83
>> 2647191 [Thread-16-spout0] INFO
>> storm.artemis.kafka.trident.ZkBrokerReader - brokers need refreshing
>> because 60000ms have expired
>> 2647195 [Thread-16-spout0] INFO  storm.artemis.kafka.DynamicBrokersReader
>> - Read partition info from zookeeper:
>> GlobalPartitionInformation{partitionMap={0=10.100.70.128:9092, 1=
>> 10.100.70.128:9092, 2=10.100.70.128:9092, 3=10.100.70.128:9092, 4=
>> 10.100.70.128:9092}}
>> 2648569 [Thread-8-$spoutcoord-spout0] INFO
>> storm.artemis.kafka.trident.ZkBrokerReader - brokers need refreshing
>> because 60000ms have expired
>> 2648573 [Thread-8-$spoutcoord-spout0] INFO
>> storm.artemis.kafka.DynamicBrokersReader - Read partition info from
>> zookeeper: GlobalPartitionInformation{partitionMap={0=10.100.70.128:9092,
>> 1=10.100.70.128:9092, 2=10.100.70.128:9092, 3=10.100.70.128:9092, 4=
>> 10.100.70.128:9092}}
>> 1403070068
>> 24.85.157.225
>> 1403070070
>> 24.114.78.75
>> 1403070070
>> 76.219.198.176
>> 1403070071
>> 142.166.228.205
>> 1403070071
>> 76.66.155.166
>> 1403070071
>> 172.56.10.86
>> 1403070071
>>
>> .
>> .
>> .
>> It says brokers need refreshing because 60000ms have expired, I didn't
>> see any 60000ms being configured anywhere, I wonder what this issue is. In
>> addition, I started 3 kafka brokers, and 5 partitions for the "topictest",
>>
>> topic: topictest        partition: 0    leader: 1       replicas: 1,3,2
>> isr: 1,2,3
>> topic: topictest        partition: 1    leader: 1       replicas: 2,1,3
>> isr: 1,2,3
>> topic: topictest        partition: 2    leader: 1       replicas: 3,2,1
>> isr: 1,2,3
>> topic: topictest        partition: 3    leader: 1       replicas: 1,2,3
>> isr: 1,2,3
>> topic: topictest        partition: 4    leader: 1       replicas: 2,3,1
>> isr: 1,2,3
>>
>> What I don't understand is that I couldn't see BrokerReader for the
>> brokers 9093 and 9094. And also what that means "No data found in Kafka
>> Partition partition_2".
>>
>>
>> thanks
>>
>> Alec
>>
>>
>

Reply via email to