No, unfortunately I cannot see any errors in the log. There are only some warnings (please find the log file attached). It seems that everything is working fine, but my spout does not receive any tuples. I already restarted Zookeeper, Kafka and Storm but nothing has changed.
I use Kafka version 0.9.0.1 (Scala 2.11).
Thank you and regards,
Daniela
Gesendet: Donnerstag, 26. Mai 2016 um 05:13 Uhr
Von: "Matthias J. Sax" <[email protected]>
An: [email protected]
Betreff: Re: Aw: Re: Error when deploying a topology
Von: "Matthias J. Sax" <[email protected]>
An: [email protected]
Betreff: Re: Aw: Re: Error when deploying a topology
No errors in the log?
Was Kafak version do you use?
On 05/26/2016 11:33 AM, Daniela S wrote:
> Thank you very much, that solved the problem.
>
> Now I could deploy my topology, but it seems that it does not consume
> any messages from Kafka.
> My topology is running and my producer is generating messages and I can
> see these messages in the console consumer but my Spout does not emit
> any tuples.
>
> Here are parts of my code:
> *Producer *(JSON Generator using
> https://github.com/acesinc/json-data-generator):
> {
> "workflows": [{
> "workflowName": "test",
> "workflowFilename": "exampleWorkflow.json"
> }],
> "producers": [{
> "type": "kafka",
> "broker.server": "localhost",
> "broker.port": 9092,
> "topic": "logevent",
> "flatten": false,
> "sync": false
> },{
> "type":"logger"
> }]
>
> *Workflow: *
> {
> "eventFrequency": 60000,
> "varyEventFrequency": true,
> "repeatWorkflow": true,
> "timeBetweenRepeat": 60000,
> "varyRepeatFrequency": true,
> "steps": [{
> "config": [{
> "timestamp": "nowTimestamp()",
> "ID1": "integer(1,1000)",
> "ID2": "integer(1,100)",
> "command": "random('ON','OFF')"
> }],
> "duration": 0
> }]
> }
>
> *Topology:*
> public class Topology {
>
> public static void main(String[] args) throws
> AlreadyAliveException, InvalidTopologyException, AuthorizationException {
>
> ZkHosts zkHosts = new ZkHosts("localhost:2181");
> SpoutConfig kafkaConfig = new SpoutConfig(zkHosts,"logevent", "", "id7");
> kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
> TopologyBuilder builder = new TopologyBuilder();
> builder.setSpout("KafkaSpout", new KafkaSpout(kafkaConfig), 1);
> builder.setBolt("StoreToRedisBolt", new StoreToRedisBolt("127.0.0.1",
> 6379), 1).globalGrouping("KafkaSpout");
> LocalCluster cluster = new LocalCluster();
> Config conf = new Config();
> StormSubmitter.submitTopology("Topology", conf,
> builder.createTopology());
> try {
> System.out.println("Waiting to consume from kafka");
> Thread.sleep(10000);
> } catch (Exception exception) {
> System.out.println("Thread interrupted exception : " + exception);
> }
> }
> }
> *Bolt*
> public class StoreToRedisBolt implements IBasicBolt{
>
> private static final long serialVersionUID = 2L;
> private RedisOperations redisOperations = null;
> private String redisIP = null;
> private int port;
>
> public StoreToRedisBolt(String redisIP, int port) {
> this.redisIP = redisIP;
> this.port = port;
> }
>
> public void execute(Tuple input, BasicOutputCollector collector) {
> Map<String, Object> record = new HashMap<String, Object>();
> String tupleString = input.getString(0);
> JSONObject obj = new JSONObject(tupleString);
> record.put("timestamp", obj.getString("timestamp"));
> record.put("ID1", obj.getString("ID1"));
> record.put("ID2", obj.getString("ID2"));
> record.put("command", obj.getString("command"));
> String id = obj.getString("ID1") + obj.getString("ID2");
> redisOperations.insert(record, id);
> }
> public void declareOutputFields(OutputFieldsDeclarer declarer) {
>
> }
> public Map<String, Object> getComponentConfiguration() {
> return null;
> }
> public void prepare(Map stormConf, TopologyContext context) {
> redisOperations = new RedisOperations(this.redisIP, this.port);
> }
> public void cleanup() {
>
> }
> }
>
> Thank you so much for your help in advance!
>
> Regards
>
> *Gesendet:* Mittwoch, 25. Mai 2016 um 16:45 Uhr
> *Von:* "Matthias J. Sax" <[email protected]>
> *An:* [email protected]
> *Betreff:* Re: Error when deploying a topology
> Is com/google/common/cache/Cache contained in your jar file?
>
> On 05/25/2016 10:35 PM, Daniela S wrote:
>> Hi
>>
>> I am trying to deploy a simple topology, which receives data from Kafka
>> and should write it to Redis. But unfortunately I receive the following
>> error message:
>>
>> 13659 [Thread-9] INFO o.a.s.d.worker - Worker
>> 741ffc04-f217-41e7-b942-490c1895e7d7 for storm Topology-1-1464208228 on
>> 66fa5b8f-87ee-42ee-8436-4fed29a20580:1024 has finished loading
>> 13660 [Thread-9] INFO o.a.s.config - SET worker-user
>> 741ffc04-f217-41e7-b942-490c1895e7d7
>> 14512 [Thread-21-__system-executor[-1 -1]] INFO o.a.s.d.executor -
>> Preparing bolt __system:(-1)
>> 14536 [Thread-21-__system-executor[-1 -1]] INFO o.a.s.d.executor -
>> Prepared bolt __system:(-1)
>> 14553 [Thread-15-StoreToRedisBolt-executor[2 2]] INFO o.a.s.d.executor
>> - Preparing bolt StoreToRedisBolt:(2)
>> 14588 [Thread-19-KafkaSpout-executor[1 1]] INFO o.a.s.d.executor -
>> Opening spout KafkaSpout:(1)
>> 14597 [Thread-17-__acker-executor[3 3]] INFO o.a.s.d.executor -
>> Preparing bolt __acker:(3)
>> 14608 [Thread-17-__acker-executor[3 3]] INFO o.a.s.d.executor -
>> Prepared bolt __acker:(3)
>> 14635 [Thread-15-StoreToRedisBolt-executor[2 2]] INFO o.a.s.d.executor
>> - Prepared bolt StoreToRedisBolt:(2)
>> 14709 [Thread-19-KafkaSpout-executor[1 1]] ERROR o.a.s.util - Async loop
>> died!
>> java.lang.NoSuchMethodError:
>>
> com.google.common.cache.CacheBuilder.build()Lcom/google/common/cache/Cache;
>> at
>>
> org.apache.curator.framework.imps.NamespaceWatcherMap.<init>(NamespaceWatcherMap.java:33)
>> ~[realtime-v1-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
>> at
>>
> org.apache.curator.framework.imps.CuratorFrameworkImpl.<init>(CuratorFrameworkImpl.java:81)
>> ~[realtime-v1-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
>> at
>>
> org.apache.curator.framework.CuratorFrameworkFactory$Builder.build(CuratorFrameworkFactory.java:145)
>> ~[realtime-v1-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
>> at
>>
> org.apache.curator.framework.CuratorFrameworkFactory.newClient(CuratorFrameworkFactory.java:100)
>> ~[realtime-v1-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
>> at org.apache.storm.kafka.ZkState.newCurator(ZkState.java:45)
>> ~[realtime-v1-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
>> at org.apache.storm.kafka.ZkState.<init>(ZkState.java:61)
>> ~[realtime-v1-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
>> at org.apache.storm.kafka.KafkaSpout.open(KafkaSpout.java:75)
>> ~[realtime-v1-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
>> at
>>
> org.apache.storm.daemon.executor$fn__8158$fn__8173.invoke(executor.clj:602)
>> ~[storm-core-1.0.0.jar:1.0.0]
>> at org.apache.storm.util$async_loop$fn__554.invoke(util.clj:482)
>> [storm-core-1.0.0.jar:1.0.0]
>> at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
>> at java.lang.Thread.run(Thread.java:745) [?:1.8.0_91]
>> 14712 [Thread-19-KafkaSpout-executor[1 1]] ERROR o.a.s.d.executor -
>> java.lang.NoSuchMethodError:
>>
> com.google.common.cache.CacheBuilder.build()Lcom/google/common/cache/Cache;
>> at
>>
> org.apache.curator.framework.imps.NamespaceWatcherMap.<init>(NamespaceWatcherMap.java:33)
>> ~[realtime-v1-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
>> at
>>
> org.apache.curator.framework.imps.CuratorFrameworkImpl.<init>(CuratorFrameworkImpl.java:81)
>> ~[realtime-v1-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
>> at
>>
> org.apache.curator.framework.CuratorFrameworkFactory$Builder.build(CuratorFrameworkFactory.java:145)
>> ~[realtime-v1-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
>> at
>>
> org.apache.curator.framework.CuratorFrameworkFactory.newClient(CuratorFrameworkFactory.java:100)
>> ~[realtime-v1-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
>> at org.apache.storm.kafka.ZkState.newCurator(ZkState.java:45)
>> ~[realtime-v1-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
>> at org.apache.storm.kafka.ZkState.<init>(ZkState.java:61)
>> ~[realtime-v1-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
>> at org.apache.storm.kafka.KafkaSpout.open(KafkaSpout.java:75)
>> ~[realtime-v1-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
>> at
>>
> org.apache.storm.daemon.executor$fn__8158$fn__8173.invoke(executor.clj:602)
>> ~[storm-core-1.0.0.jar:1.0.0]
>> at org.apache.storm.util$async_loop$fn__554.invoke(util.clj:482)
>> [storm-core-1.0.0.jar:1.0.0]
>> at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
>> at java.lang.Thread.run(Thread.java:745) [?:1.8.0_91]
>> 14774 [Thread-19-KafkaSpout-executor[1 1]] ERROR o.a.s.util - Halting
>> process: ("Worker died")
>> java.lang.RuntimeException: ("Worker died")
>> at org.apache.storm.util$exit_process_BANG_.doInvoke(util.clj:341)
>> [storm-core-1.0.0.jar:1.0.0]
>> at clojure.lang.RestFn.invoke(RestFn.java:423) [clojure-1.7.0.jar:?]
>> at
>> org.apache.storm.daemon.worker$fn__8827$fn__8828.invoke(worker.clj:758)
>> [storm-core-1.0.0.jar:1.0.0]
>> at
>>
> org.apache.storm.daemon.executor$mk_executor_data$fn__8046$fn__8047.invoke(executor.clj:271)
>> [storm-core-1.0.0.jar:1.0.0]
>> at org.apache.storm.util$async_loop$fn__554.invoke(util.clj:494)
>> [storm-core-1.0.0.jar:1.0.0]
>> at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
>> at java.lang.Thread.run(Thread.java:745) [?:1.8.0_91]
>>
>> Does anyone know what may be the problem?
>>
>> Thank you for your help!
>>
>> Regards,
>> Daniela
>>
>>
>
Was Kafak version do you use?
On 05/26/2016 11:33 AM, Daniela S wrote:
> Thank you very much, that solved the problem.
>
> Now I could deploy my topology, but it seems that it does not consume
> any messages from Kafka.
> My topology is running and my producer is generating messages and I can
> see these messages in the console consumer but my Spout does not emit
> any tuples.
>
> Here are parts of my code:
> *Producer *(JSON Generator using
> https://github.com/acesinc/json-data-generator):
> {
> "workflows": [{
> "workflowName": "test",
> "workflowFilename": "exampleWorkflow.json"
> }],
> "producers": [{
> "type": "kafka",
> "broker.server": "localhost",
> "broker.port": 9092,
> "topic": "logevent",
> "flatten": false,
> "sync": false
> },{
> "type":"logger"
> }]
>
> *Workflow: *
> {
> "eventFrequency": 60000,
> "varyEventFrequency": true,
> "repeatWorkflow": true,
> "timeBetweenRepeat": 60000,
> "varyRepeatFrequency": true,
> "steps": [{
> "config": [{
> "timestamp": "nowTimestamp()",
> "ID1": "integer(1,1000)",
> "ID2": "integer(1,100)",
> "command": "random('ON','OFF')"
> }],
> "duration": 0
> }]
> }
>
> *Topology:*
> public class Topology {
>
> public static void main(String[] args) throws
> AlreadyAliveException, InvalidTopologyException, AuthorizationException {
>
> ZkHosts zkHosts = new ZkHosts("localhost:2181");
> SpoutConfig kafkaConfig = new SpoutConfig(zkHosts,"logevent", "", "id7");
> kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
> TopologyBuilder builder = new TopologyBuilder();
> builder.setSpout("KafkaSpout", new KafkaSpout(kafkaConfig), 1);
> builder.setBolt("StoreToRedisBolt", new StoreToRedisBolt("127.0.0.1",
> 6379), 1).globalGrouping("KafkaSpout");
> LocalCluster cluster = new LocalCluster();
> Config conf = new Config();
> StormSubmitter.submitTopology("Topology", conf,
> builder.createTopology());
> try {
> System.out.println("Waiting to consume from kafka");
> Thread.sleep(10000);
> } catch (Exception exception) {
> System.out.println("Thread interrupted exception : " + exception);
> }
> }
> }
> *Bolt*
> public class StoreToRedisBolt implements IBasicBolt{
>
> private static final long serialVersionUID = 2L;
> private RedisOperations redisOperations = null;
> private String redisIP = null;
> private int port;
>
> public StoreToRedisBolt(String redisIP, int port) {
> this.redisIP = redisIP;
> this.port = port;
> }
>
> public void execute(Tuple input, BasicOutputCollector collector) {
> Map<String, Object> record = new HashMap<String, Object>();
> String tupleString = input.getString(0);
> JSONObject obj = new JSONObject(tupleString);
> record.put("timestamp", obj.getString("timestamp"));
> record.put("ID1", obj.getString("ID1"));
> record.put("ID2", obj.getString("ID2"));
> record.put("command", obj.getString("command"));
> String id = obj.getString("ID1") + obj.getString("ID2");
> redisOperations.insert(record, id);
> }
> public void declareOutputFields(OutputFieldsDeclarer declarer) {
>
> }
> public Map<String, Object> getComponentConfiguration() {
> return null;
> }
> public void prepare(Map stormConf, TopologyContext context) {
> redisOperations = new RedisOperations(this.redisIP, this.port);
> }
> public void cleanup() {
>
> }
> }
>
> Thank you so much for your help in advance!
>
> Regards
>
> *Gesendet:* Mittwoch, 25. Mai 2016 um 16:45 Uhr
> *Von:* "Matthias J. Sax" <[email protected]>
> *An:* [email protected]
> *Betreff:* Re: Error when deploying a topology
> Is com/google/common/cache/Cache contained in your jar file?
>
> On 05/25/2016 10:35 PM, Daniela S wrote:
>> Hi
>>
>> I am trying to deploy a simple topology, which receives data from Kafka
>> and should write it to Redis. But unfortunately I receive the following
>> error message:
>>
>> 13659 [Thread-9] INFO o.a.s.d.worker - Worker
>> 741ffc04-f217-41e7-b942-490c1895e7d7 for storm Topology-1-1464208228 on
>> 66fa5b8f-87ee-42ee-8436-4fed29a20580:1024 has finished loading
>> 13660 [Thread-9] INFO o.a.s.config - SET worker-user
>> 741ffc04-f217-41e7-b942-490c1895e7d7
>> 14512 [Thread-21-__system-executor[-1 -1]] INFO o.a.s.d.executor -
>> Preparing bolt __system:(-1)
>> 14536 [Thread-21-__system-executor[-1 -1]] INFO o.a.s.d.executor -
>> Prepared bolt __system:(-1)
>> 14553 [Thread-15-StoreToRedisBolt-executor[2 2]] INFO o.a.s.d.executor
>> - Preparing bolt StoreToRedisBolt:(2)
>> 14588 [Thread-19-KafkaSpout-executor[1 1]] INFO o.a.s.d.executor -
>> Opening spout KafkaSpout:(1)
>> 14597 [Thread-17-__acker-executor[3 3]] INFO o.a.s.d.executor -
>> Preparing bolt __acker:(3)
>> 14608 [Thread-17-__acker-executor[3 3]] INFO o.a.s.d.executor -
>> Prepared bolt __acker:(3)
>> 14635 [Thread-15-StoreToRedisBolt-executor[2 2]] INFO o.a.s.d.executor
>> - Prepared bolt StoreToRedisBolt:(2)
>> 14709 [Thread-19-KafkaSpout-executor[1 1]] ERROR o.a.s.util - Async loop
>> died!
>> java.lang.NoSuchMethodError:
>>
> com.google.common.cache.CacheBuilder.build()Lcom/google/common/cache/Cache;
>> at
>>
> org.apache.curator.framework.imps.NamespaceWatcherMap.<init>(NamespaceWatcherMap.java:33)
>> ~[realtime-v1-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
>> at
>>
> org.apache.curator.framework.imps.CuratorFrameworkImpl.<init>(CuratorFrameworkImpl.java:81)
>> ~[realtime-v1-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
>> at
>>
> org.apache.curator.framework.CuratorFrameworkFactory$Builder.build(CuratorFrameworkFactory.java:145)
>> ~[realtime-v1-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
>> at
>>
> org.apache.curator.framework.CuratorFrameworkFactory.newClient(CuratorFrameworkFactory.java:100)
>> ~[realtime-v1-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
>> at org.apache.storm.kafka.ZkState.newCurator(ZkState.java:45)
>> ~[realtime-v1-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
>> at org.apache.storm.kafka.ZkState.<init>(ZkState.java:61)
>> ~[realtime-v1-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
>> at org.apache.storm.kafka.KafkaSpout.open(KafkaSpout.java:75)
>> ~[realtime-v1-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
>> at
>>
> org.apache.storm.daemon.executor$fn__8158$fn__8173.invoke(executor.clj:602)
>> ~[storm-core-1.0.0.jar:1.0.0]
>> at org.apache.storm.util$async_loop$fn__554.invoke(util.clj:482)
>> [storm-core-1.0.0.jar:1.0.0]
>> at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
>> at java.lang.Thread.run(Thread.java:745) [?:1.8.0_91]
>> 14712 [Thread-19-KafkaSpout-executor[1 1]] ERROR o.a.s.d.executor -
>> java.lang.NoSuchMethodError:
>>
> com.google.common.cache.CacheBuilder.build()Lcom/google/common/cache/Cache;
>> at
>>
> org.apache.curator.framework.imps.NamespaceWatcherMap.<init>(NamespaceWatcherMap.java:33)
>> ~[realtime-v1-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
>> at
>>
> org.apache.curator.framework.imps.CuratorFrameworkImpl.<init>(CuratorFrameworkImpl.java:81)
>> ~[realtime-v1-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
>> at
>>
> org.apache.curator.framework.CuratorFrameworkFactory$Builder.build(CuratorFrameworkFactory.java:145)
>> ~[realtime-v1-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
>> at
>>
> org.apache.curator.framework.CuratorFrameworkFactory.newClient(CuratorFrameworkFactory.java:100)
>> ~[realtime-v1-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
>> at org.apache.storm.kafka.ZkState.newCurator(ZkState.java:45)
>> ~[realtime-v1-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
>> at org.apache.storm.kafka.ZkState.<init>(ZkState.java:61)
>> ~[realtime-v1-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
>> at org.apache.storm.kafka.KafkaSpout.open(KafkaSpout.java:75)
>> ~[realtime-v1-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
>> at
>>
> org.apache.storm.daemon.executor$fn__8158$fn__8173.invoke(executor.clj:602)
>> ~[storm-core-1.0.0.jar:1.0.0]
>> at org.apache.storm.util$async_loop$fn__554.invoke(util.clj:482)
>> [storm-core-1.0.0.jar:1.0.0]
>> at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
>> at java.lang.Thread.run(Thread.java:745) [?:1.8.0_91]
>> 14774 [Thread-19-KafkaSpout-executor[1 1]] ERROR o.a.s.util - Halting
>> process: ("Worker died")
>> java.lang.RuntimeException: ("Worker died")
>> at org.apache.storm.util$exit_process_BANG_.doInvoke(util.clj:341)
>> [storm-core-1.0.0.jar:1.0.0]
>> at clojure.lang.RestFn.invoke(RestFn.java:423) [clojure-1.7.0.jar:?]
>> at
>> org.apache.storm.daemon.worker$fn__8827$fn__8828.invoke(worker.clj:758)
>> [storm-core-1.0.0.jar:1.0.0]
>> at
>>
> org.apache.storm.daemon.executor$mk_executor_data$fn__8046$fn__8047.invoke(executor.clj:271)
>> [storm-core-1.0.0.jar:1.0.0]
>> at org.apache.storm.util$async_loop$fn__554.invoke(util.clj:494)
>> [storm-core-1.0.0.jar:1.0.0]
>> at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
>> at java.lang.Thread.run(Thread.java:745) [?:1.8.0_91]
>>
>> Does anyone know what may be the problem?
>>
>> Thank you for your help!
>>
>> Regards,
>> Daniela
>>
>>
>
log
Description: Binary data
