Do you have serializers.registry.json.class =org.apache.samza.serializers.JsonSerdeFactory
in your config file? Fang, Yan yanfang...@gmail.com On Wed, Jul 1, 2015 at 2:59 PM, Shekar Tippur <ctip...@gmail.com> wrote: > Yi/Milinda, > > I am trying to initialize a kv store. I have the following properties > defined: > > stores.store-name.key.serde=json > > stores.store-name.msg.serde=json > > stores.store-name.changelog=argos.windowchangelog > How do I define a key serde as I am getting this exception: > > Exception in thread "main" org.apache.samza.SamzaException: Must define a > key serde when using key value storage. > > at > > org.apache.samza.storage.kv.BaseKeyValueStorageEngineFactory$class.getStorageEngine(BaseKeyValueStorageEngineFactory.scala:86) > > at > > org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory.getStorageEngine(RocksDbKeyValueStorageEngineFactory.scala:28) > > at > > org.apache.samza.container.SamzaContainer$$anonfun$35$$anonfun$38.apply(SamzaContainer.scala:455) > > at > > org.apache.samza.container.SamzaContainer$$anonfun$35$$anonfun$38.apply(SamzaContainer.scala:439) > > at > > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > > at > > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > > at scala.collection.immutable.Map$Map1.foreach(Map.scala:109) > > at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) > > at scala.collection.AbstractTraversable.map(Traversable.scala:105) > > at > > org.apache.samza.container.SamzaContainer$$anonfun$35.apply(SamzaContainer.scala:439) > > at > > org.apache.samza.container.SamzaContainer$$anonfun$35.apply(SamzaContainer.scala:416) > > at > > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > > at > > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > > at scala.collection.immutable.Set$Set1.foreach(Set.scala:74) > > at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) > > at > > scala.collection.AbstractSet.scala$collection$SetLike$$super$map(Set.scala:47) > > at scala.collection.SetLike$class.map(SetLike.scala:93) > > at scala.collection.AbstractSet.map(Set.scala:47) > > at > org.apache.samza.container.SamzaContainer$.apply(SamzaContainer.scala:416) > > at > > org.apache.samza.job.local.ThreadJobFactory.getJob(ThreadJobFactory.scala:63) > > at org.apache.samza.job.JobRunner.run(JobRunner.scala:62) > > at org.apache.samza.job.JobRunner$.main(JobRunner.scala:37) > > at org.apache.samza.job.JobRunner.main(JobRunner.scala) > > On Mon, Jun 29, 2015 at 12:41 PM, Shekar Tippur <ctip...@gmail.com> wrote: > > > Yi, > > > > My use case is more of the latter. Your explanation makes sense now. I > was > > also looking into Milinda's wiki. She has a section for Kafka > > partition SimplePartitioner, which is simple enough as well. > > > > Thanks for all the inputs. Let me see what I come up with while > > implementing it. > > > > - Shekar > > > > On Mon, Jun 29, 2015 at 10:42 AM, Yi Pan <nickpa...@gmail.com> wrote: > > > >> Hi, Shekar, > >> > >> First, I would like to clarify what you meant by sliding window: is it > >> defined as windows with size N and advance step size of 1 (which means > >> that > >> windows overlap and each input message would contribute to multiple > counts > >> in different windows)? Or windows with size N and advance step size of N > >> (i.e. each incoming message only contribute to one counter in a single > >> window)? > >> > >> If your use case falls into the first category, you will need something > >> more sophisticated as discussed in SAMZA-552. If your use case is the > >> second one, there could be a simpler version of SAMZA-552 that you can > go > >> with: > >> > >> 1) Initiate a KV-store that uses the application name as the key > >> 2) For each incoming message, look for the windows that the message by > the > >> application name > >> 3) Update the counter and update the value in the KV-store based on the > >> application name > >> 4) Every 5 min when window() method is triggered, set all counters to > zero > >> (this can be done in a lazy way as well, by keeping the last reset > >> timestamp in the record in the KV-store, keyed by application name. > Then, > >> resetting counter to zero can be done when next time the application > >> counter is updated again) > >> > >> Hope that makes sense. > >> > >> -Yi > >> > >> On Mon, Jun 29, 2015 at 10:06 AM, Shekar Tippur <ctip...@gmail.com> > >> wrote: > >> > >> > Benjamin, > >> > > >> > Thanks for the explanation. We dont have any specific partition scheme > >> as > >> > yet. We just have 2 topics - raw and processed and we use default > >> > partitioning scheme. > >> > Can you share any code snippet so I can understand it better? > >> > > >> > - Shekar > >> > > >> > > > > >