Thanks for replying! Can you please suggest changes in the attached code. Also in the topology, I have mentioned in config the custom class for value and serializer logic for that:
*conf.put(Config.TOPOLOGY_STATE_PROVIDER_CONFIG,"{" + "\"valueClass\":\"org.apache.storm.state.OurCustomPair\","+ "\"valueSerializerClass\":\"org.apache.storm.state.OurStateSerializer\"}");* On Sun, Mar 12, 2017 at 12:12 AM, Arun Mahadevan <ar...@apache.org> wrote: > Is your kryo instance wrapped in a thread local like in the > DefaultStateSerializer? > > > > Also see - https://github.com/EsotericSoftware/kryo#threading > > > > Arun > > > > *From: *anshu shukla <anshushuk...@gmail.com> > *Reply-To: *"user@storm.apache.org" <user@storm.apache.org> > *Date: *Saturday, March 11, 2017 at 8:16 PM > *To: *"user@storm.apache.org" <user@storm.apache.org> > *Subject: *Writing custom Serializer Stateful bolt > > > > *Hello,* > > > > Is there any source to get the clear idea for writing the *serializer for > the custom class object to Redis. * > > After trying a lot I am unable to find the reason behind this exception. > > > > *===================================================* > > *com.esotericsoftware.kryo.KryoException: Buffer underflow.* > > *Serialization trace:* > > input (org.apache.storm.state.OurCustomPair) > > at com.esotericsoftware.kryo.io.Input.require(Input.java:199) > ~[kryo-3.0.3.jar:?] > > at com.esotericsoftware.kryo.io.Input.readAscii_slow(Input.java:616) > ~[kryo-3.0.3.jar:?] > > at com.esotericsoftware.kryo.io.Input.readAscii(Input.java:594) > ~[kryo-3.0.3.jar:?] > > at com.esotericsoftware.kryo.io.Input.readString(Input.java:472) > ~[kryo-3.0.3.jar:?] > > at com.esotericsoftware.kryo.util.DefaultClassResolver. > readName(DefaultClassResolver.java:150) ~[kryo-3.0.3.jar:?] > > at > com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:133) > ~[kryo-3.0.3.jar:?] > > at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:670) > ~[kryo-3.0.3.jar:?] > > at > com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:118) > ~[kryo-3.0.3.jar:?] > > at > com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:551) > ~[kryo-3.0.3.jar:?] > > at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:686) > ~[kryo-3.0.3.jar:?] > > at > org.apache.storm.state.OurStateSerializer.deserialize(OurStateSerializer.java:99) > ~[classes/:?] > > at > org.apache.storm.redis.state.RedisKeyValueState.get(RedisKeyValueState.java:143) > ~[classes/:?] > > at org.apache.storm.spout.CheckpointSpout.open(CheckpointSpout.java:78) > ~[classes/:?] > > at org.apache.storm.spout.CheckpointSpout.open(CheckpointSpout.java:67) > ~[classes/:?] > > at org.apache.storm.daemon.executor$fn__5022$fn__5037.invoke(executor.clj:600) > ~[classes/:?] > > at org.apache.storm.util$async_loop$fn__655.invoke(util.clj:482) > [classes/:?] > > at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?] > > at java.lang.Thread.run(Thread.java:745) [?:1.7.0_79] > > > > -- > > Thanks & Regards, > Anshu Shukla > -- Thanks & Regards, Anshu Shukla
OurCustomPair.java
Description: Binary data
OurStateSerializer.java
Description: Binary data