sometimes netty connection fails

2014-08-26 Thread 이승진
Dear all,
When running a topology in cluster mode, worker cannot connect to Netty in 
other node like below.
Is there any possible reason for this?
Thanks in advance
2014-08-26 18:11:42 b.s.m.n.Client [INFO] Reconnect started for 
Netty-Client-somehostname/10.someip:6700... [30]
2014-08-26 18:11:43 b.s.m.n.Client [INFO] Closing Netty Client 
Netty-Client-somehostname/10.someip:67002014-08-26 18:11:43 b.s.m.n.Client 
[INFO] Waiting for pending batchs to be sent with 
Netty-Client-somehostname/10.someip:6700..., timeout: 60ms, pendings: 0


MultiCountMetric is empty

2014-08-26 Thread 이승진
Dear all,
 
Imagine there a 3 bolts A,B and C linked to one another and one metric 
consumer(IMetricConsumer implementation)
 
 
Inside the execute method in each bolt, there is a small piece of code sending 
metric to consumer
AMetric.scope(cnt).incr();
BMetric.scope(cnt).incr();
CMetric.scope(cnt).incr();
 
In prepare method of each bolt, I declared each metric like below
BMetric = new MultiCountMetric();
context.registerMetric(B, BMetric, 2);

 
In topology declaration, I registered metric consumer class obviously like below
conf.registerMetricsConsumer(Consumer.class, 2);


In handleDataPoints method of consumer, I tried to aggregate metric like 
belowfor (IMetricsConsumer.DataPoint p : dataPoints) {
if (p.name.equals(A)) {
System.out.println(((Map) p.value).get(cnt);
}
else if (p.name.equals(B)) {
System.out.println(((Map) p.value).get(cnt);
} 
else if (p.name.equals(C)) {
System.out.println(((Map) p.value).get(cnt);
}
}
 
but here, metric from Bolt A arrives as expected but other than that, p.value 
is empty
 
If there is a mistake, can you tell me what's the reason for this?
 
 
Thanks in advance.
 

 


Can you share your feeling about storm with mesos, if tried?

2014-08-04 Thread 이승진
 
I'm considering of running storm on top of mesos, which is cluster management 
tool (apache opensource).
 
and recently Nathan wrote storm-mesos for running storm on top of mesos.
 
I want to ask one simple question if you have tried that.
 
Is there any explicit advantage over running just storm? If yes, can you share 
your experience?
 
Sincerely,




storm with redis, java.lang.IllegalStateException: Returned object not currently part of this pool

2014-07-21 Thread 이승진

java.lang.RuntimeException: redis.clients.jedis.exceptions.JedisException: 
Could not return the resource to the pool
at 
backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128)
at 
backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99)
at 
backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80)
at 
backtype.storm.daemon.executor$fn__5641$fn__5653$fn__5700.invoke(executor.clj:746)
at backtype.storm.util$async_loop$fn__457.invoke(util.clj:431)
at clojure.lang.AFn.run(AFn.java:24)
at java.lang.Thread.run(Thread.java:745)
Caused by: redis.clients.jedis.exceptions.JedisException: Could not return the 
resource to the pool
at redis.clients.util.Pool.returnResourceObject(Pool.java:54)
at redis.clients.jedis.JedisPool.returnResource(JedisPool.java:98)
at 
com.naver.labs.nelo2.notifier.bolt.ThresholdCheckBolt.execute(ThresholdCheckBolt.java:143)
at 
backtype.storm.topology.BasicBoltExecutor.execute(BasicBoltExecutor.java:50)
at 
backtype.storm.daemon.executor$fn__5641$tuple_action_fn__5643.invoke(executor.clj:631)
at 
backtype.storm.daemon.executor$mk_task_receiver$fn__5564.invoke(executor.clj:399)
at 
backtype.storm.disruptor$clojure_handler$reify__745.onEvent(disruptor.clj:58)
at 
backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:120)
... 6 more
Caused by: java.lang.IllegalStateException: Returned object not currently part 
of this pool
at 
org.apache.commons.pool2.impl.GenericObjectPool.returnObject(GenericObjectPool.java:534)
at redis.clients.util.Pool.returnResourceObject(Pool.java:52)
... 13 more
Hi all,
 
I'm seeing very weird error message like above.
 
In certain bolt, it communicate with redis.
 
When I set parallelism hint to 32, it works fine.
 
but when I set parallelism hint to some number bigger than 32, that error 
occurs.
 
currently I have 2 supervisor nodes of storm, each has 1 worker.
 
Is it Storm's problem or Jedis's, any clue will be appreciated.


Can I make a bolt to do a batch job?

2014-07-11 Thread 이승진
Hi all,
 
Assume that there are one spout and one bolt.
 
and spout emits a single log, and bolt stores it into DB for example.
 
and they are connected using shuffle grouping.
 
It comes to my mind that if a bolt can wait for 10 tuples to come from spout, 
and request batch insert to DB, it might be beneficial performancewise.
 
Is there any way to make a bolt gather tuples until certain size and do it's 
job?


Re: Can I make a bolt to do a batch job?

2014-07-11 Thread 이승진
great, I think I have to try this right away.
 
so the tick tuples are generated periodically and flows through every single 
executor within the topology?
 
I read some articles online just now, and sounds to me like I have to write 
code for two cases for every bolt; normal tuple case and tick tuple case. 
 
If my understanding is correct, is there any way to make tick tuple apply to 
certain bolt only?
 
-Original Message-
From: Srinath Clt;srinat...@gmail.comgt; 
To: userlt;user@storm.incubator.apache.orggt;; 
이승진lt;sweetest...@navercorp.comgt;; 
Cc: 
Sent: 2014-07-11 (금) 17:17:54
Subject: Re: Can I make a bolt to do a batch job?
 
You could use tick tuples to do that. The bolt can be configured to receive 
periodic ticks which you can use to do the batch insert.



On Fri, Jul 11, 2014 at 1:35 PM, 이승진 lt;sweetest...@navercorp.comgt; wrote:


Hi all,
Assume that there are one spout and one bolt.
and spout emits a single log, and bolt stores it into DB for example.


and they are connected using shuffle grouping.
It comes to my mind that if a bolt can wait for 10 tuples to come from spout, 
and request batch insert to DB, it might be beneficial performancewise.


Is there any way to make a bolt gather tuples until certain size and do it's 
job?

 




Re: storm crashes when one of the zookeeper server dies

2014-07-04 Thread 이승진
Actually, it was not just a zookeeper process down, but whole machine dies due 
to kernel panic, and ping to that server failed accordingly.All servers were 
connected to each other and in replication mode, of course.
-Original Message-
From: Irek Khasyanovlt;qua...@gmail.comgt; 
To: userlt;user@storm.incubator.apache.orggt;; 
이승진lt;sweetest...@navercorp.comgt;; 
Cc: 
Sent: 2014-07-04 (금) 14:51:00
Subject: Re: storm crashes when one of the zookeeper server dies
 
Are you sure all zookeeper servers are in replication mode and can connect each 
other? Yesterday we added zookeeper cluster and checked what happens with storm 
when one zookeeper fails. Everything was good, nothing happens with topology.


On 4 July 2014 08:26, 이승진 lt;sweetest...@navercorp.comgt; wrote:

in storm.yaml, we listed 3 zookeeper servers
storm.zookeeper.servers:
-host1
-host2
-host3

and host1 dies unexpectedly today morning,
since then, not only I cannot connect to storm UI (TTransport exception) but 
also can't execute any of storm command.
I was quite worried when this happened, because if it's what storm is supposed 
to be, one of the zookeeper server can be a SPOF.

seems like it should be fixed ASAP
Sincerly,


 -- 
With best regards, Irek Khasyanov.





KafkaSpout tps decreases as time passes

2014-07-03 Thread 이승진
Hi all,
 
I have a topology which includes only single spout from 3 partitions of one 
topic.
 
and what I'm seeing from storm UI is quite weird
 
at first 10minute and in its early phase, tps is about 16~17k
 
but after running it for 2~3 hours, tps drops to 5~6k
 
There are more than 900M logs in that topic and I read it from first.
 
Strange thing here is, only one of worker machines' memory consumption is very 
high.
 
It grows continuously even though I set worker.JAVA_OPTS.xmx(sorry I don't 
remember exact name of that configuration) to half of physical memory.
 
But the other worker machines consume memory only about 10% of their own 
physical memory.
 
can you guess any reason of what's happening to me?
 
 
Sincerly,
 


storm crashes when one of the zookeeper server dies

2014-07-03 Thread 이승진
in storm.yaml, we listed 3 zookeeper servers
 
storm.zookeeper.servers:
-host1
-host2
-host3
 
and host1 dies unexpectedly today morning,
 
since then, not only I cannot connect to storm UI (TTransport exception) but 
also can't execute any of storm command.
 
I was quite worried when this happened, because if it's what storm is supposed 
to be, one of the zookeeper server can be a SPOF.
 
seems like it should be fixed ASAP
 
Sincerly,


Re: Spout fail logic

2014-07-03 Thread 이승진
No storm does not take care of replaying message
 
It just calls spout's fail method 
 
When you look at KafkaSpout's fail method, there's a logic in case of failure.
 
Currently, it fetches buffer size of data again from the offset and checks if 
they are already processed, and if not, replay that message.
 
In short, exactly-once message processing is not guaranteed by storm but 
implemented by KafkaSpout in its own way.


-Original Message-
From: lt;francesco.masu...@pronetics.itgt; 
To: lt;user@storm.incubator.apache.orggt;; 
Cc: 
Sent: 2014-07-03 (목) 22:45:27
Subject: Re: Spout fail logic




Storm will take care of the replay of the message 

 

Francesco Maria Masucci

 

 

Il 2014-07-03 15:15 Tomas Mazukna ha scritto:


If a spout receives fail on the message, will storm replay the message, or 
spout needs to re-emit it again?-- 
Tomas Mazukna
 

 

 





RE: storm 0.9.2-incubating, nimbus is not launching.

2014-07-01 Thread 이승진
Finally found the reason why.
Quick solution : remove storm node recursively from your zookeeper and rerun.
Old data in zookeeper storm node conflicts with newly installed storm, and that 
causes serialVersionUID mismatch.
referred to https://github.com/Parsely/streamparse/issues/27

-Original Message-
From: 이승진lt;sweetest...@navercorp.comgt; 
To: lt;user@storm.incubator.apache.orggt;; 
Cc: 
Sent: 2014-07-01 (화) 11:55:04
Subject: storm 0.9.2-incubating, nimbus is not launching.
 
Hi all,
 
Today I tried to upgrade storm version to 0.9.2-incubating but I think it has a 
problem with a zookeeper
 
when I run nimbus with storm nibmus command, it returns following error and 
process halts.
 
I'm using zookeeper 3.4.4, is there any compatibility issue?
 
java.lang.RuntimeException: java.io.InvalidClassException: 
backtype.storm.daemon.common.SupervisorInfo; local class incompatible: stream 
classdesc serialVersionUID = 7648414326720210054, local class serialVersionUID 
= 7463898661547835557at 
backtype.storm.utils.Utils.deserialize(Utils.java:93) 
~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]at 
backtype.storm.cluster$maybe_deserialize.invoke(cluster.clj:200) 
~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]at 
backtype.storm.cluster$mk_storm_cluster_state$reify__2284.supervisor_info(cluster.clj:299)
 ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]at 
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.7.0_25]  
  at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
~[na:1.7.0_25]at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 ~[na:1.7.0_25]at java.lang.reflect.Method.invoke(Method.java:606) 
~[na:1.7.0_25]at 
clojure.lang.Reflector.invokeMatchingMethod(Reflector.java:93) 
~[clojure-1.5.1.jar:na]at 
clojure.lang.Reflector.invokeInstanceMethod(Reflector.java:28) 
~[clojure-1.5.1.jar:na]at 
backtype.storm.daemon.nimbus$all_supervisor_info$fn__4715.invoke(nimbus.clj:277)
 ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]at 
clojure.core$map$fn__4207.invoke(core.clj:2487) ~[clojure-1.5.1.jar:na]
at clojure.lang.LazySeq.sval(LazySeq.java:42) ~[clojure-1.5.1.jar:na]at 
clojure.lang.LazySeq.seq(LazySeq.java:60) ~[clojure-1.5.1.jar:na]at 
clojure.lang.RT.seq(RT.java:484) ~[clojure-1.5.1.jar:na]at 
clojure.core$seq.invoke(core.clj:133) ~[clojure-1.5.1.jar:na]at 
clojure.core$apply.invoke(core.clj:617) ~[clojure-1.5.1.jar:na]at 
clojure.core$mapcat.doInvoke(core.clj:2514) ~[clojure-1.5.1.jar:na]at 
clojure.lang.RestFn.invoke(RestFn.java:423) ~[clojure-1.5.1.jar:na]at 
backtype.storm.daemon.nimbus$all_supervisor_info.invoke(nimbus.clj:275) 
~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]at 
backtype.storm.daemon.nimbus$all_scheduling_slots.invoke(nimbus.clj:288) 
~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]at 
backtype.storm.daemon.nimbus$compute_new_topology__GT_executor__GT_node_PLUS_port.invoke(nimbus.clj:580)
 ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]at 
backtype.storm.daemon.nimbus$mk_assignments.doInvoke(nimbus.clj:660) 
~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]at 
clojure.lang.RestFn.invoke(RestFn.java:410) ~[clojure-1.5.1.jar:na]at 
backtype.storm.daemon.nimbus$fn__5210$exec_fn__1396__auto5211$fn__5216$fn__5217.invoke(nimbus.clj:905)
 ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]at 
backtype.storm.daemon.nimbus$fn__5210$exec_fn__1396__auto5211$fn__5216.invoke(nimbus.clj:904)
 ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]at 
backtype.storm.timer$schedule_recurring$this__1134.invoke(timer.clj:99) 
~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]at 
backtype.storm.timer$mk_timer$fn__1117$fn__1118.invoke(timer.clj:50) 
~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]at 
backtype.storm.timer$mk_timer$fn__1117.invoke(timer.clj:42) 
~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]at 
clojure.lang.AFn.run(AFn.java:24) ~[clojure-1.5.1.jar:na]at 
java.lang.Thread.run(Thread.java:724) ~[na:1.7.0_25]Caused by: 
java.io.InvalidClassException: backtype.storm.daemon.common.SupervisorInfo; 
local class incompatible: stream classdesc serialVersionUID = 
7648414326720210054, local class serialVersionUID = 7463898661547835557
at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:617) 
~[na:1.7.0_25]at 
java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1620) 
~[na:1.7.0_25]at 
java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1515) 
~[na:1.7.0_25]at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1769) 
~[na:1.7.0_25]at 
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1348) 
~[na:1.7.0_25]at 

storm netty connection failed error

2014-07-01 Thread 이승진
Hi all,
 
I just upgraded storm to 0.9.2-incubating (previously using 0.9.0.1), launched 
a simple topology, and got an error related to Netty
 
Do I have to prepare Netty seperately or is there any reason of exception below?
 
Any comments would help, thanks in advance.
 
 
2014-07-01 15:04:48 b.s.m.n.StormClientErrorHandler [INFO] Connection failed 
Netty-Client-some-ip:6703java.nio.channels.UnresolvedAddressException: null 
   at sun.nio.ch.Net.checkAddress(Net.java:127) ~[na:1.7.0_60]at 
sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:644) ~[na:1.7.0_60] 
   at 
org.jboss.netty.channel.socket.nio.NioClientSocketPipelineSink.connect(NioClientSocketPipelineSink.java:140)
 ~[netty-3.2.2.Final.jar:na]at 
org.jboss.netty.channel.socket.nio.NioClientSocketPipelineSink.eventSunk(NioClientSocketPipelineSink.java:103)
 ~[netty-3.2.2.Final.jar:na]at 
org.jboss.netty.handler.codec.oneone.OneToOneEncoder.handleDownstream(OneToOneEncoder.java:60)
 ~[netty-3.2.2.Final.jar:na]at 
org.jboss.netty.channel.Channels.connect(Channels.java:541) 
[netty-3.2.2.Final.jar:na]at 
org.jboss.netty.channel.AbstractChannel.connect(AbstractChannel.java:218) 
[netty-3.2.2.Final.jar:na]at 
org.jboss.netty.bootstrap.ClientBootstrap.connect(ClientBootstrap.java:227) 
[netty-3.2.2.Final.jar:na]at 
org.jboss.netty.bootstrap.ClientBootstrap.connect(ClientBootstrap.java:188) 
[netty-3.2.2.Final.jar:na]at 
backtype.storm.messaging.netty.Client.connect(Client.java:147) 
[storm-core-0.9.2-incubating.jar:0.9.2-incubating]at 
backtype.storm.messaging.netty.Client.access$000(Client.java:42) 
[storm-core-0.9.2-incubating.jar:0.9.2-incubating]at 
backtype.storm.messaging.netty.Client$1.run(Client.java:104) 
[storm-core-0.9.2-incubating.jar:0.9.2-incubating]at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) 
[na:1.7.0_60]at 
java.util.concurrent.FutureTask.run(FutureTask.java:262) [na:1.7.0_60]
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:178)
 [na:1.7.0_60]at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:292)
 [na:1.7.0_60]at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
[na:1.7.0_60]at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
[na:1.7.0_60]at java.lang.Thread.run(Thread.java:745) [na:1.7.0_60]


lack of information on storm 0.9.2 UI

2014-07-01 Thread 이승진

Hi all, 
 
Recently I upgraded storm to 0.9.2, and on Storm UI above after launching a 
topology, only those information is shown.
 
information about each bolt and worker seems missing, is it supposed to be like 
this in 0.9.2?
 
thanks


storm 0.9.2-incubating, nimbus is not launching.

2014-06-30 Thread 이승진
Hi all,
 
Today I tried to upgrade storm version to 0.9.2-incubating but I think it has a 
problem with a zookeeper
 
when I run nimbus with storm nibmus command, it returns following error and 
process halts.
 
I'm using zookeeper 3.4.4, is there any compatibility issue?
 
java.lang.RuntimeException: java.io.InvalidClassException: 
backtype.storm.daemon.common.SupervisorInfo; local class incompatible: stream 
classdesc serialVersionUID = 7648414326720210054, local class serialVersionUID 
= 7463898661547835557at 
backtype.storm.utils.Utils.deserialize(Utils.java:93) 
~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]at 
backtype.storm.cluster$maybe_deserialize.invoke(cluster.clj:200) 
~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]at 
backtype.storm.cluster$mk_storm_cluster_state$reify__2284.supervisor_info(cluster.clj:299)
 ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]at 
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.7.0_25]  
  at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
~[na:1.7.0_25]at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 ~[na:1.7.0_25]at java.lang.reflect.Method.invoke(Method.java:606) 
~[na:1.7.0_25]at 
clojure.lang.Reflector.invokeMatchingMethod(Reflector.java:93) 
~[clojure-1.5.1.jar:na]at 
clojure.lang.Reflector.invokeInstanceMethod(Reflector.java:28) 
~[clojure-1.5.1.jar:na]at 
backtype.storm.daemon.nimbus$all_supervisor_info$fn__4715.invoke(nimbus.clj:277)
 ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]at 
clojure.core$map$fn__4207.invoke(core.clj:2487) ~[clojure-1.5.1.jar:na]
at clojure.lang.LazySeq.sval(LazySeq.java:42) ~[clojure-1.5.1.jar:na]at 
clojure.lang.LazySeq.seq(LazySeq.java:60) ~[clojure-1.5.1.jar:na]at 
clojure.lang.RT.seq(RT.java:484) ~[clojure-1.5.1.jar:na]at 
clojure.core$seq.invoke(core.clj:133) ~[clojure-1.5.1.jar:na]at 
clojure.core$apply.invoke(core.clj:617) ~[clojure-1.5.1.jar:na]at 
clojure.core$mapcat.doInvoke(core.clj:2514) ~[clojure-1.5.1.jar:na]at 
clojure.lang.RestFn.invoke(RestFn.java:423) ~[clojure-1.5.1.jar:na]at 
backtype.storm.daemon.nimbus$all_supervisor_info.invoke(nimbus.clj:275) 
~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]at 
backtype.storm.daemon.nimbus$all_scheduling_slots.invoke(nimbus.clj:288) 
~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]at 
backtype.storm.daemon.nimbus$compute_new_topology__GT_executor__GT_node_PLUS_port.invoke(nimbus.clj:580)
 ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]at 
backtype.storm.daemon.nimbus$mk_assignments.doInvoke(nimbus.clj:660) 
~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]at 
clojure.lang.RestFn.invoke(RestFn.java:410) ~[clojure-1.5.1.jar:na]at 
backtype.storm.daemon.nimbus$fn__5210$exec_fn__1396__auto5211$fn__5216$fn__5217.invoke(nimbus.clj:905)
 ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]at 
backtype.storm.daemon.nimbus$fn__5210$exec_fn__1396__auto5211$fn__5216.invoke(nimbus.clj:904)
 ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]at 
backtype.storm.timer$schedule_recurring$this__1134.invoke(timer.clj:99) 
~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]at 
backtype.storm.timer$mk_timer$fn__1117$fn__1118.invoke(timer.clj:50) 
~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]at 
backtype.storm.timer$mk_timer$fn__1117.invoke(timer.clj:42) 
~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]at 
clojure.lang.AFn.run(AFn.java:24) ~[clojure-1.5.1.jar:na]at 
java.lang.Thread.run(Thread.java:724) ~[na:1.7.0_25]Caused by: 
java.io.InvalidClassException: backtype.storm.daemon.common.SupervisorInfo; 
local class incompatible: stream classdesc serialVersionUID = 
7648414326720210054, local class serialVersionUID = 7463898661547835557
at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:617) 
~[na:1.7.0_25]at 
java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1620) 
~[na:1.7.0_25]at 
java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1515) 
~[na:1.7.0_25]at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1769) 
~[na:1.7.0_25]at 
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1348) 
~[na:1.7.0_25]at 
java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) ~[na:1.7.0_25] 
   at backtype.storm.utils.Utils.deserialize(Utils.java:89) 
~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]... 29 common frames 
omitted


storm-kafka : EOFException: Received -1 when reading from channel, socket has likely been closed.

2014-06-22 Thread 이승진
I'm trying to use storm-kafka 0.9.0-wip16a-scala292
below is my spout config
Listlt;HostPortgt; hosts = new ArrayListlt;HostPortgt;();   
 hosts.add(new HostPort(host1,2181));hosts.add(new 
HostPort(host2,2181));SpoutConfig config = new SpoutConfig(new 
KafkaConfig.StaticHosts(hosts,3),logs,
/storm,// path where brokers and consumers exist eg. 
/storm/borkers/topics/... and /storm/consumers/...
topologyName);normalLogSpoutConfig.forceStartOffsetTime(-1);
KafkaSpout normalLogSpout = new KafkaSpout(normalLogSpoutConfig);
I got this error and worker dies.
INFO  kafka.consumer.SimpleConsumer - Reconnect in get offetset request due to 
socket error: java.io.EOFException: Received -1 when reading from channel, 
socket has likely been closed.
I'm using kafka_2.9.2-0.8.1, so is it a version problem? 
or am I using wrong path?
 

RE: storm-kafka : EOFException: Received -1 when reading from channel, socket has likely been closed.

2014-06-22 Thread 이승진
hi,You can use this veersion:
thanks, I already tried this one and worked fine.
but I'm afraid of using this one because it's not official and wurstmeister 
said he does not maintain this anymore.
anyway, thanks for reply.
Sincerely
 
-Original Message-
From: 傅駿浩lt;jamesw...@yahoo.com.twgt; 
To: user@storm.incubator.apache.orglt;user@storm.incubator.apache.orggt;; 
이승진lt;sweetest...@navercorp.comgt;; 
Cc: 
Sent: 2014-06-23 (월) 12:59:43
Subject: RE: storm-kafka : EOFException: Received -1 when reading from channel, 
socket has likely been closed.
 
hi,You can use this veersion:lt;dependencygt;
  lt;groupIdgt;net.wurstmeister.stormlt;/groupIdgt;
  lt;artifactIdgt;storm-kafka-0.8-pluslt;/artifactIdgt;
  lt;versiongt;0.4.0lt;/versiongt;
lt;/dependencygt;This version works well with kafka 0.8.1.1 and storm 0.9.1 
but with some bugs. If you want, you can git clone the latest version from 
github:https://github.com/apache/incubator-stormBut you need to change your 
storm version to latest.
 
James 
  이승진 lt;sweetest...@navercorp.comgt; 於 2014/6/23 (週一) 9:44 AM 寫道﹕ 
I'm trying
 to use storm-kafka 0.9.0-wip16a-scala292 below is my spout config 
Listlt;HostPortgt; hosts = new ArrayListlt;HostPortgt;();
hosts.add(new HostPort(host1,2181));hosts.add(new 
HostPort(host2,2181));SpoutConfig config = new SpoutConfig(new 
KafkaConfig.StaticHosts(hosts,3),logs,
/storm,// path where brokers and consumers
 exist eg. /storm/borkers/topics/... and /storm/consumers/...   
 topologyName);normalLogSpoutConfig.forceStartOffsetTime(-1);   
 KafkaSpout normalLogSpout = new KafkaSpout(normalLogSpoutConfig); I got 
this error and worker dies. INFO  kafka.consumer.SimpleConsumer - Reconnect in 
get offetset request due to socket error: java.io.EOFException: Received -1 
when reading from channel, socket has likely been closed. I'm using 
kafka_2.9.2-0.8.1, so is
 it a version problem?  or am I using wrong path?  
   



Can I not use tuple timeout?

2014-06-18 Thread 이승진
Dear all,
 
AFAIK, spout 'assumes' processing a tuple is failed when it does not accept ack 
in timeout interval.
 
But I found that even if spout's fail method is called, that tuple is still 
running through topology i.e other bolts are still processing that tuple.
 
So actually failed tuple is not failed but just delayed.
 
I think I can configure timeout value bigger, but I want to know if there's a 
way to avoid using spout's timeout 
 
Sincerly
 






getting tuple by messageId

2014-06-18 Thread 이승진
Hi all,
I'm using BaseRichSpout and to my knowledge, I have to implement the replay 
logic to guarantee message processing.
When I see the fail method of BaseRichSpout,public void fail(java.lang.Object 
msgId)it just gives me messageId.and of course I set collector in open method. 
so I think if I make fail method to do collector.emit(failed tuple), I can 
guarantee exactly-once message processing.Is there any way to get a tuple in a 
tuple tree by its Id? or do I have to fetch again from some source?
thanks