sometimes netty connection fails
Dear all, When running a topology in cluster mode, worker cannot connect to Netty in other node like below. Is there any possible reason for this? Thanks in advance 2014-08-26 18:11:42 b.s.m.n.Client [INFO] Reconnect started for Netty-Client-somehostname/10.someip:6700... [30] 2014-08-26 18:11:43 b.s.m.n.Client [INFO] Closing Netty Client Netty-Client-somehostname/10.someip:67002014-08-26 18:11:43 b.s.m.n.Client [INFO] Waiting for pending batchs to be sent with Netty-Client-somehostname/10.someip:6700..., timeout: 60ms, pendings: 0
MultiCountMetric is empty
Dear all, Imagine there a 3 bolts A,B and C linked to one another and one metric consumer(IMetricConsumer implementation) Inside the execute method in each bolt, there is a small piece of code sending metric to consumer AMetric.scope(cnt).incr(); BMetric.scope(cnt).incr(); CMetric.scope(cnt).incr(); In prepare method of each bolt, I declared each metric like below BMetric = new MultiCountMetric(); context.registerMetric(B, BMetric, 2); In topology declaration, I registered metric consumer class obviously like below conf.registerMetricsConsumer(Consumer.class, 2); In handleDataPoints method of consumer, I tried to aggregate metric like belowfor (IMetricsConsumer.DataPoint p : dataPoints) { if (p.name.equals(A)) { System.out.println(((Map) p.value).get(cnt); } else if (p.name.equals(B)) { System.out.println(((Map) p.value).get(cnt); } else if (p.name.equals(C)) { System.out.println(((Map) p.value).get(cnt); } } but here, metric from Bolt A arrives as expected but other than that, p.value is empty If there is a mistake, can you tell me what's the reason for this? Thanks in advance.
Can you share your feeling about storm with mesos, if tried?
I'm considering of running storm on top of mesos, which is cluster management tool (apache opensource). and recently Nathan wrote storm-mesos for running storm on top of mesos. I want to ask one simple question if you have tried that. Is there any explicit advantage over running just storm? If yes, can you share your experience? Sincerely,
storm with redis, java.lang.IllegalStateException: Returned object not currently part of this pool
java.lang.RuntimeException: redis.clients.jedis.exceptions.JedisException: Could not return the resource to the pool at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128) at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99) at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80) at backtype.storm.daemon.executor$fn__5641$fn__5653$fn__5700.invoke(executor.clj:746) at backtype.storm.util$async_loop$fn__457.invoke(util.clj:431) at clojure.lang.AFn.run(AFn.java:24) at java.lang.Thread.run(Thread.java:745) Caused by: redis.clients.jedis.exceptions.JedisException: Could not return the resource to the pool at redis.clients.util.Pool.returnResourceObject(Pool.java:54) at redis.clients.jedis.JedisPool.returnResource(JedisPool.java:98) at com.naver.labs.nelo2.notifier.bolt.ThresholdCheckBolt.execute(ThresholdCheckBolt.java:143) at backtype.storm.topology.BasicBoltExecutor.execute(BasicBoltExecutor.java:50) at backtype.storm.daemon.executor$fn__5641$tuple_action_fn__5643.invoke(executor.clj:631) at backtype.storm.daemon.executor$mk_task_receiver$fn__5564.invoke(executor.clj:399) at backtype.storm.disruptor$clojure_handler$reify__745.onEvent(disruptor.clj:58) at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:120) ... 6 more Caused by: java.lang.IllegalStateException: Returned object not currently part of this pool at org.apache.commons.pool2.impl.GenericObjectPool.returnObject(GenericObjectPool.java:534) at redis.clients.util.Pool.returnResourceObject(Pool.java:52) ... 13 more Hi all, I'm seeing very weird error message like above. In certain bolt, it communicate with redis. When I set parallelism hint to 32, it works fine. but when I set parallelism hint to some number bigger than 32, that error occurs. currently I have 2 supervisor nodes of storm, each has 1 worker. Is it Storm's problem or Jedis's, any clue will be appreciated.
Can I make a bolt to do a batch job?
Hi all, Assume that there are one spout and one bolt. and spout emits a single log, and bolt stores it into DB for example. and they are connected using shuffle grouping. It comes to my mind that if a bolt can wait for 10 tuples to come from spout, and request batch insert to DB, it might be beneficial performancewise. Is there any way to make a bolt gather tuples until certain size and do it's job?
Re: Can I make a bolt to do a batch job?
great, I think I have to try this right away. so the tick tuples are generated periodically and flows through every single executor within the topology? I read some articles online just now, and sounds to me like I have to write code for two cases for every bolt; normal tuple case and tick tuple case. If my understanding is correct, is there any way to make tick tuple apply to certain bolt only? -Original Message- From: Srinath Clt;srinat...@gmail.comgt; To: userlt;user@storm.incubator.apache.orggt;; 이승진lt;sweetest...@navercorp.comgt;; Cc: Sent: 2014-07-11 (금) 17:17:54 Subject: Re: Can I make a bolt to do a batch job? You could use tick tuples to do that. The bolt can be configured to receive periodic ticks which you can use to do the batch insert. On Fri, Jul 11, 2014 at 1:35 PM, 이승진 lt;sweetest...@navercorp.comgt; wrote: Hi all, Assume that there are one spout and one bolt. and spout emits a single log, and bolt stores it into DB for example. and they are connected using shuffle grouping. It comes to my mind that if a bolt can wait for 10 tuples to come from spout, and request batch insert to DB, it might be beneficial performancewise. Is there any way to make a bolt gather tuples until certain size and do it's job?
Re: storm crashes when one of the zookeeper server dies
Actually, it was not just a zookeeper process down, but whole machine dies due to kernel panic, and ping to that server failed accordingly.All servers were connected to each other and in replication mode, of course. -Original Message- From: Irek Khasyanovlt;qua...@gmail.comgt; To: userlt;user@storm.incubator.apache.orggt;; 이승진lt;sweetest...@navercorp.comgt;; Cc: Sent: 2014-07-04 (금) 14:51:00 Subject: Re: storm crashes when one of the zookeeper server dies Are you sure all zookeeper servers are in replication mode and can connect each other? Yesterday we added zookeeper cluster and checked what happens with storm when one zookeeper fails. Everything was good, nothing happens with topology. On 4 July 2014 08:26, 이승진 lt;sweetest...@navercorp.comgt; wrote: in storm.yaml, we listed 3 zookeeper servers storm.zookeeper.servers: -host1 -host2 -host3 and host1 dies unexpectedly today morning, since then, not only I cannot connect to storm UI (TTransport exception) but also can't execute any of storm command. I was quite worried when this happened, because if it's what storm is supposed to be, one of the zookeeper server can be a SPOF. seems like it should be fixed ASAP Sincerly, -- With best regards, Irek Khasyanov.
KafkaSpout tps decreases as time passes
Hi all, I have a topology which includes only single spout from 3 partitions of one topic. and what I'm seeing from storm UI is quite weird at first 10minute and in its early phase, tps is about 16~17k but after running it for 2~3 hours, tps drops to 5~6k There are more than 900M logs in that topic and I read it from first. Strange thing here is, only one of worker machines' memory consumption is very high. It grows continuously even though I set worker.JAVA_OPTS.xmx(sorry I don't remember exact name of that configuration) to half of physical memory. But the other worker machines consume memory only about 10% of their own physical memory. can you guess any reason of what's happening to me? Sincerly,
storm crashes when one of the zookeeper server dies
in storm.yaml, we listed 3 zookeeper servers storm.zookeeper.servers: -host1 -host2 -host3 and host1 dies unexpectedly today morning, since then, not only I cannot connect to storm UI (TTransport exception) but also can't execute any of storm command. I was quite worried when this happened, because if it's what storm is supposed to be, one of the zookeeper server can be a SPOF. seems like it should be fixed ASAP Sincerly,
Re: Spout fail logic
No storm does not take care of replaying message It just calls spout's fail method When you look at KafkaSpout's fail method, there's a logic in case of failure. Currently, it fetches buffer size of data again from the offset and checks if they are already processed, and if not, replay that message. In short, exactly-once message processing is not guaranteed by storm but implemented by KafkaSpout in its own way. -Original Message- From: lt;francesco.masu...@pronetics.itgt; To: lt;user@storm.incubator.apache.orggt;; Cc: Sent: 2014-07-03 (목) 22:45:27 Subject: Re: Spout fail logic Storm will take care of the replay of the message Francesco Maria Masucci Il 2014-07-03 15:15 Tomas Mazukna ha scritto: If a spout receives fail on the message, will storm replay the message, or spout needs to re-emit it again?-- Tomas Mazukna
RE: storm 0.9.2-incubating, nimbus is not launching.
Finally found the reason why. Quick solution : remove storm node recursively from your zookeeper and rerun. Old data in zookeeper storm node conflicts with newly installed storm, and that causes serialVersionUID mismatch. referred to https://github.com/Parsely/streamparse/issues/27 -Original Message- From: 이승진lt;sweetest...@navercorp.comgt; To: lt;user@storm.incubator.apache.orggt;; Cc: Sent: 2014-07-01 (화) 11:55:04 Subject: storm 0.9.2-incubating, nimbus is not launching. Hi all, Today I tried to upgrade storm version to 0.9.2-incubating but I think it has a problem with a zookeeper when I run nimbus with storm nibmus command, it returns following error and process halts. I'm using zookeeper 3.4.4, is there any compatibility issue? java.lang.RuntimeException: java.io.InvalidClassException: backtype.storm.daemon.common.SupervisorInfo; local class incompatible: stream classdesc serialVersionUID = 7648414326720210054, local class serialVersionUID = 7463898661547835557at backtype.storm.utils.Utils.deserialize(Utils.java:93) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]at backtype.storm.cluster$maybe_deserialize.invoke(cluster.clj:200) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]at backtype.storm.cluster$mk_storm_cluster_state$reify__2284.supervisor_info(cluster.clj:299) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.7.0_25] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) ~[na:1.7.0_25]at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.7.0_25]at java.lang.reflect.Method.invoke(Method.java:606) ~[na:1.7.0_25]at clojure.lang.Reflector.invokeMatchingMethod(Reflector.java:93) ~[clojure-1.5.1.jar:na]at clojure.lang.Reflector.invokeInstanceMethod(Reflector.java:28) ~[clojure-1.5.1.jar:na]at backtype.storm.daemon.nimbus$all_supervisor_info$fn__4715.invoke(nimbus.clj:277) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]at clojure.core$map$fn__4207.invoke(core.clj:2487) ~[clojure-1.5.1.jar:na] at clojure.lang.LazySeq.sval(LazySeq.java:42) ~[clojure-1.5.1.jar:na]at clojure.lang.LazySeq.seq(LazySeq.java:60) ~[clojure-1.5.1.jar:na]at clojure.lang.RT.seq(RT.java:484) ~[clojure-1.5.1.jar:na]at clojure.core$seq.invoke(core.clj:133) ~[clojure-1.5.1.jar:na]at clojure.core$apply.invoke(core.clj:617) ~[clojure-1.5.1.jar:na]at clojure.core$mapcat.doInvoke(core.clj:2514) ~[clojure-1.5.1.jar:na]at clojure.lang.RestFn.invoke(RestFn.java:423) ~[clojure-1.5.1.jar:na]at backtype.storm.daemon.nimbus$all_supervisor_info.invoke(nimbus.clj:275) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]at backtype.storm.daemon.nimbus$all_scheduling_slots.invoke(nimbus.clj:288) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]at backtype.storm.daemon.nimbus$compute_new_topology__GT_executor__GT_node_PLUS_port.invoke(nimbus.clj:580) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]at backtype.storm.daemon.nimbus$mk_assignments.doInvoke(nimbus.clj:660) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]at clojure.lang.RestFn.invoke(RestFn.java:410) ~[clojure-1.5.1.jar:na]at backtype.storm.daemon.nimbus$fn__5210$exec_fn__1396__auto5211$fn__5216$fn__5217.invoke(nimbus.clj:905) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]at backtype.storm.daemon.nimbus$fn__5210$exec_fn__1396__auto5211$fn__5216.invoke(nimbus.clj:904) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]at backtype.storm.timer$schedule_recurring$this__1134.invoke(timer.clj:99) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]at backtype.storm.timer$mk_timer$fn__1117$fn__1118.invoke(timer.clj:50) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]at backtype.storm.timer$mk_timer$fn__1117.invoke(timer.clj:42) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]at clojure.lang.AFn.run(AFn.java:24) ~[clojure-1.5.1.jar:na]at java.lang.Thread.run(Thread.java:724) ~[na:1.7.0_25]Caused by: java.io.InvalidClassException: backtype.storm.daemon.common.SupervisorInfo; local class incompatible: stream classdesc serialVersionUID = 7648414326720210054, local class serialVersionUID = 7463898661547835557 at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:617) ~[na:1.7.0_25]at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1620) ~[na:1.7.0_25]at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1515) ~[na:1.7.0_25]at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1769) ~[na:1.7.0_25]at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1348) ~[na:1.7.0_25]at
storm netty connection failed error
Hi all, I just upgraded storm to 0.9.2-incubating (previously using 0.9.0.1), launched a simple topology, and got an error related to Netty Do I have to prepare Netty seperately or is there any reason of exception below? Any comments would help, thanks in advance. 2014-07-01 15:04:48 b.s.m.n.StormClientErrorHandler [INFO] Connection failed Netty-Client-some-ip:6703java.nio.channels.UnresolvedAddressException: null at sun.nio.ch.Net.checkAddress(Net.java:127) ~[na:1.7.0_60]at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:644) ~[na:1.7.0_60] at org.jboss.netty.channel.socket.nio.NioClientSocketPipelineSink.connect(NioClientSocketPipelineSink.java:140) ~[netty-3.2.2.Final.jar:na]at org.jboss.netty.channel.socket.nio.NioClientSocketPipelineSink.eventSunk(NioClientSocketPipelineSink.java:103) ~[netty-3.2.2.Final.jar:na]at org.jboss.netty.handler.codec.oneone.OneToOneEncoder.handleDownstream(OneToOneEncoder.java:60) ~[netty-3.2.2.Final.jar:na]at org.jboss.netty.channel.Channels.connect(Channels.java:541) [netty-3.2.2.Final.jar:na]at org.jboss.netty.channel.AbstractChannel.connect(AbstractChannel.java:218) [netty-3.2.2.Final.jar:na]at org.jboss.netty.bootstrap.ClientBootstrap.connect(ClientBootstrap.java:227) [netty-3.2.2.Final.jar:na]at org.jboss.netty.bootstrap.ClientBootstrap.connect(ClientBootstrap.java:188) [netty-3.2.2.Final.jar:na]at backtype.storm.messaging.netty.Client.connect(Client.java:147) [storm-core-0.9.2-incubating.jar:0.9.2-incubating]at backtype.storm.messaging.netty.Client.access$000(Client.java:42) [storm-core-0.9.2-incubating.jar:0.9.2-incubating]at backtype.storm.messaging.netty.Client$1.run(Client.java:104) [storm-core-0.9.2-incubating.jar:0.9.2-incubating]at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) [na:1.7.0_60]at java.util.concurrent.FutureTask.run(FutureTask.java:262) [na:1.7.0_60] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:178) [na:1.7.0_60]at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:292) [na:1.7.0_60]at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) [na:1.7.0_60]at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) [na:1.7.0_60]at java.lang.Thread.run(Thread.java:745) [na:1.7.0_60]
lack of information on storm 0.9.2 UI
Hi all, Recently I upgraded storm to 0.9.2, and on Storm UI above after launching a topology, only those information is shown. information about each bolt and worker seems missing, is it supposed to be like this in 0.9.2? thanks
storm 0.9.2-incubating, nimbus is not launching.
Hi all, Today I tried to upgrade storm version to 0.9.2-incubating but I think it has a problem with a zookeeper when I run nimbus with storm nibmus command, it returns following error and process halts. I'm using zookeeper 3.4.4, is there any compatibility issue? java.lang.RuntimeException: java.io.InvalidClassException: backtype.storm.daemon.common.SupervisorInfo; local class incompatible: stream classdesc serialVersionUID = 7648414326720210054, local class serialVersionUID = 7463898661547835557at backtype.storm.utils.Utils.deserialize(Utils.java:93) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]at backtype.storm.cluster$maybe_deserialize.invoke(cluster.clj:200) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]at backtype.storm.cluster$mk_storm_cluster_state$reify__2284.supervisor_info(cluster.clj:299) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.7.0_25] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) ~[na:1.7.0_25]at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.7.0_25]at java.lang.reflect.Method.invoke(Method.java:606) ~[na:1.7.0_25]at clojure.lang.Reflector.invokeMatchingMethod(Reflector.java:93) ~[clojure-1.5.1.jar:na]at clojure.lang.Reflector.invokeInstanceMethod(Reflector.java:28) ~[clojure-1.5.1.jar:na]at backtype.storm.daemon.nimbus$all_supervisor_info$fn__4715.invoke(nimbus.clj:277) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]at clojure.core$map$fn__4207.invoke(core.clj:2487) ~[clojure-1.5.1.jar:na] at clojure.lang.LazySeq.sval(LazySeq.java:42) ~[clojure-1.5.1.jar:na]at clojure.lang.LazySeq.seq(LazySeq.java:60) ~[clojure-1.5.1.jar:na]at clojure.lang.RT.seq(RT.java:484) ~[clojure-1.5.1.jar:na]at clojure.core$seq.invoke(core.clj:133) ~[clojure-1.5.1.jar:na]at clojure.core$apply.invoke(core.clj:617) ~[clojure-1.5.1.jar:na]at clojure.core$mapcat.doInvoke(core.clj:2514) ~[clojure-1.5.1.jar:na]at clojure.lang.RestFn.invoke(RestFn.java:423) ~[clojure-1.5.1.jar:na]at backtype.storm.daemon.nimbus$all_supervisor_info.invoke(nimbus.clj:275) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]at backtype.storm.daemon.nimbus$all_scheduling_slots.invoke(nimbus.clj:288) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]at backtype.storm.daemon.nimbus$compute_new_topology__GT_executor__GT_node_PLUS_port.invoke(nimbus.clj:580) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]at backtype.storm.daemon.nimbus$mk_assignments.doInvoke(nimbus.clj:660) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]at clojure.lang.RestFn.invoke(RestFn.java:410) ~[clojure-1.5.1.jar:na]at backtype.storm.daemon.nimbus$fn__5210$exec_fn__1396__auto5211$fn__5216$fn__5217.invoke(nimbus.clj:905) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]at backtype.storm.daemon.nimbus$fn__5210$exec_fn__1396__auto5211$fn__5216.invoke(nimbus.clj:904) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]at backtype.storm.timer$schedule_recurring$this__1134.invoke(timer.clj:99) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]at backtype.storm.timer$mk_timer$fn__1117$fn__1118.invoke(timer.clj:50) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]at backtype.storm.timer$mk_timer$fn__1117.invoke(timer.clj:42) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]at clojure.lang.AFn.run(AFn.java:24) ~[clojure-1.5.1.jar:na]at java.lang.Thread.run(Thread.java:724) ~[na:1.7.0_25]Caused by: java.io.InvalidClassException: backtype.storm.daemon.common.SupervisorInfo; local class incompatible: stream classdesc serialVersionUID = 7648414326720210054, local class serialVersionUID = 7463898661547835557 at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:617) ~[na:1.7.0_25]at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1620) ~[na:1.7.0_25]at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1515) ~[na:1.7.0_25]at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1769) ~[na:1.7.0_25]at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1348) ~[na:1.7.0_25]at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) ~[na:1.7.0_25] at backtype.storm.utils.Utils.deserialize(Utils.java:89) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]... 29 common frames omitted
storm-kafka : EOFException: Received -1 when reading from channel, socket has likely been closed.
I'm trying to use storm-kafka 0.9.0-wip16a-scala292 below is my spout config Listlt;HostPortgt; hosts = new ArrayListlt;HostPortgt;(); hosts.add(new HostPort(host1,2181));hosts.add(new HostPort(host2,2181));SpoutConfig config = new SpoutConfig(new KafkaConfig.StaticHosts(hosts,3),logs, /storm,// path where brokers and consumers exist eg. /storm/borkers/topics/... and /storm/consumers/... topologyName);normalLogSpoutConfig.forceStartOffsetTime(-1); KafkaSpout normalLogSpout = new KafkaSpout(normalLogSpoutConfig); I got this error and worker dies. INFO kafka.consumer.SimpleConsumer - Reconnect in get offetset request due to socket error: java.io.EOFException: Received -1 when reading from channel, socket has likely been closed. I'm using kafka_2.9.2-0.8.1, so is it a version problem? or am I using wrong path?
RE: storm-kafka : EOFException: Received -1 when reading from channel, socket has likely been closed.
hi,You can use this veersion: thanks, I already tried this one and worked fine. but I'm afraid of using this one because it's not official and wurstmeister said he does not maintain this anymore. anyway, thanks for reply. Sincerely -Original Message- From: 傅駿浩lt;jamesw...@yahoo.com.twgt; To: user@storm.incubator.apache.orglt;user@storm.incubator.apache.orggt;; 이승진lt;sweetest...@navercorp.comgt;; Cc: Sent: 2014-06-23 (월) 12:59:43 Subject: RE: storm-kafka : EOFException: Received -1 when reading from channel, socket has likely been closed. hi,You can use this veersion:lt;dependencygt; lt;groupIdgt;net.wurstmeister.stormlt;/groupIdgt; lt;artifactIdgt;storm-kafka-0.8-pluslt;/artifactIdgt; lt;versiongt;0.4.0lt;/versiongt; lt;/dependencygt;This version works well with kafka 0.8.1.1 and storm 0.9.1 but with some bugs. If you want, you can git clone the latest version from github:https://github.com/apache/incubator-stormBut you need to change your storm version to latest. James 이승진 lt;sweetest...@navercorp.comgt; 於 2014/6/23 (週一) 9:44 AM 寫道﹕ I'm trying to use storm-kafka 0.9.0-wip16a-scala292 below is my spout config Listlt;HostPortgt; hosts = new ArrayListlt;HostPortgt;(); hosts.add(new HostPort(host1,2181));hosts.add(new HostPort(host2,2181));SpoutConfig config = new SpoutConfig(new KafkaConfig.StaticHosts(hosts,3),logs, /storm,// path where brokers and consumers exist eg. /storm/borkers/topics/... and /storm/consumers/... topologyName);normalLogSpoutConfig.forceStartOffsetTime(-1); KafkaSpout normalLogSpout = new KafkaSpout(normalLogSpoutConfig); I got this error and worker dies. INFO kafka.consumer.SimpleConsumer - Reconnect in get offetset request due to socket error: java.io.EOFException: Received -1 when reading from channel, socket has likely been closed. I'm using kafka_2.9.2-0.8.1, so is it a version problem? or am I using wrong path?
Can I not use tuple timeout?
Dear all, AFAIK, spout 'assumes' processing a tuple is failed when it does not accept ack in timeout interval. But I found that even if spout's fail method is called, that tuple is still running through topology i.e other bolts are still processing that tuple. So actually failed tuple is not failed but just delayed. I think I can configure timeout value bigger, but I want to know if there's a way to avoid using spout's timeout Sincerly
getting tuple by messageId
Hi all, I'm using BaseRichSpout and to my knowledge, I have to implement the replay logic to guarantee message processing. When I see the fail method of BaseRichSpout,public void fail(java.lang.Object msgId)it just gives me messageId.and of course I set collector in open method. so I think if I make fail method to do collector.emit(failed tuple), I can guarantee exactly-once message processing.Is there any way to get a tuple in a tuple tree by its Id? or do I have to fetch again from some source? thanks