Re: Samza and sliding window
Yeah, that's why I added some test code in the window() to call store.all() and iterate through. I traced into it in my local environment and verified that the iterator is functioning with store.all(). -Yi On Thu, Jul 23, 2015 at 4:26 PM, Shekar Tippur ctip...@gmail.com wrote: Yi, In my case, I am able to append to the key but I am not able to get the store and iterate through. If you look at http://pastebin.com/fKGpHwW6, line 146, I am able to get the store value. but in window routine - line 187, I am unable to get the values from store. - Shekar
Re: Samza and sliding window
Yi, In my case, I am able to append to the key but I am not able to get the store and iterate through. If you look at http://pastebin.com/fKGpHwW6, line 146, I am able to get the store value. but in window routine - line 187, I am unable to get the values from store. - Shekar
Re: Samza and sliding window
Yi, I am not sure I see an attachment. Is it possible to paste that on pastebin? Shekar On Jul 21, 2015 4:27 PM, Yi Pan nickpa...@gmail.com wrote: Hi, Shekar, I have strip down your use case just to the KV-store operation and have verified that it works fine. Please see the attached diff file. If you have any further questions, please let me know. -Yi On Mon, Jul 20, 2015 at 12:35 PM, Shekar Tippur ctip...@gmail.com wrote: Yi, Here is the config: http://pastebin.com/mCALEACs - Shekar On Mon, Jul 20, 2015 at 12:27 PM, Yi Pan nickpa...@gmail.com wrote: Hi, Shekar, It would also be helpful if you can post your job configuration on the pastebin s.t. I can test the same config. Thanks! -Yi On Mon, Jul 20, 2015 at 11:11 AM, Shekar Tippur ctip...@gmail.com wrote: Yi, Thanks a lot. - Shekar
Re: Samza and sliding window
Yi, I am not sure if attachments gets filtered when sent to the group. I have seen this earlier as well. - Shekar
Re: Samza and sliding window
Hi, Shekar, Here it is: http://pastebin.com/fKGpHwW6 -Yi On Wed, Jul 22, 2015 at 8:05 AM, Shekar Tippur ctip...@gmail.com wrote: Yi, I am not sure I see an attachment. Is it possible to paste that on pastebin? Shekar On Jul 21, 2015 4:27 PM, Yi Pan nickpa...@gmail.com wrote: Hi, Shekar, I have strip down your use case just to the KV-store operation and have verified that it works fine. Please see the attached diff file. If you have any further questions, please let me know. -Yi On Mon, Jul 20, 2015 at 12:35 PM, Shekar Tippur ctip...@gmail.com wrote: Yi, Here is the config: http://pastebin.com/mCALEACs - Shekar On Mon, Jul 20, 2015 at 12:27 PM, Yi Pan nickpa...@gmail.com wrote: Hi, Shekar, It would also be helpful if you can post your job configuration on the pastebin s.t. I can test the same config. Thanks! -Yi On Mon, Jul 20, 2015 at 11:11 AM, Shekar Tippur ctip...@gmail.com wrote: Yi, Thanks a lot. - Shekar
Re: Samza and sliding window
Thanks Yi. I got the pastebin link. I am looking at it. Shekar On Jul 22, 2015 5:09 PM, Yi Pan nickpa...@gmail.com wrote: Hmm... did you get my pastebin post? If not, I can send the diff directly to your gmail account. On Wed, Jul 22, 2015 at 11:23 AM, Shekar Tippur ctip...@gmail.com wrote: Yi, I am not sure if attachments gets filtered when sent to the group. I have seen this earlier as well. - Shekar
Re: Samza and sliding window
Hi, Shekar, I have strip down your use case just to the KV-store operation and have verified that it works fine. Please see the attached diff file. If you have any further questions, please let me know. -Yi On Mon, Jul 20, 2015 at 12:35 PM, Shekar Tippur ctip...@gmail.com wrote: Yi, Here is the config: http://pastebin.com/mCALEACs - Shekar On Mon, Jul 20, 2015 at 12:27 PM, Yi Pan nickpa...@gmail.com wrote: Hi, Shekar, It would also be helpful if you can post your job configuration on the pastebin s.t. I can test the same config. Thanks! -Yi On Mon, Jul 20, 2015 at 11:11 AM, Shekar Tippur ctip...@gmail.com wrote: Yi, Thanks a lot. - Shekar
Re: Samza and sliding window
Yi, Thanks a lot. - Shekar
Re: Samza and sliding window
Hi, Shekar, It would also be helpful if you can post your job configuration on the pastebin s.t. I can test the same config. Thanks! -Yi On Mon, Jul 20, 2015 at 11:11 AM, Shekar Tippur ctip...@gmail.com wrote: Yi, Thanks a lot. - Shekar
Re: Samza and sliding window
Any takers on this please? - Shekar
Re: Samza and sliding window
Hi, Shekar, If possible, could you share your code somewhere? I can try to dig into it this weekend. Thanks! -Yi On Fri, Jul 17, 2015 at 1:31 PM, Shekar Tippur ctip...@gmail.com wrote: Any takers on this please? - Shekar
Re: Samza and sliding window
Any takers on this please? - Shekar On Fri, Jul 3, 2015 at 9:46 AM, Shekar Tippur ctip...@gmail.com wrote: Any answer on how to get all the kv values and reinitialise the kv store? Had one more question on implementing sliding window. If i use a kv store like rocksdb, and I use yarn (say 3 node cluster), the job that it runs to aggregate gets distributed as well and I am guessing the aggregation numbers get skewed? Is that a right assessment? On Thu, Jul 2, 2015 at 5:47 PM, Shekar Tippur ctip...@gmail.com wrote: Also, next.getValue() or next.getKey() does not yield anything. KeyValueIteratorString, String i = store.all(); while(i.hasNext()){ Entry String, String next = i.next(); log.info(Removed Key, next.getValue()); } On Thu, Jul 2, 2015 at 5:36 PM, Shekar Tippur ctip...@gmail.com wrote: Yi, There is no exception. I want to do couple of things in the window. - Get all the keys and values and publish to another store (like graphite) as a list - Remove all entries. I can iterate thro the list later but I want to be able to get all kv values and delete all of them in an atomic operation. How do I do these operations on the kv store? - S On Thu, Jul 2, 2015 at 4:59 PM, Yi Pan nickpa...@gmail.com wrote: Hi, Shekar, Sorry I was not able to follow up w/ you in time. It is great that you have found the configure problem and made it work! As for the exception on the iterator, could you send us the log w/ the exception? Thanks! -Yi On Thu, Jul 2, 2015 at 4:36 PM, Shekar Tippur ctip...@gmail.com wrote: Yi, Looks like it is working now. There was a redundant line in the config. I am able to initialize kv store and add values. In the window code, I am unable to retrieve them and mark them as 0. Here is my window code: public void window(MessageCollector collector, TaskCoordinator coordinator) { //store.delete(appName); collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, eventsSeen)); KeyValueIteratorString, String i = store.all(); while(i.hasNext()){ Entry String, String next = i.next(); log.info(Trying to remove Key, next.getKey()); //i.remove(); } eventsSeen = 0; i.close(); } How do I retrieve the key and is there a way to remove it? i.remove throws an exception. - Shekar On Wed, Jul 1, 2015 at 7:25 PM, Shekar Tippur ctip...@gmail.com wrote: Yi, Here is my config file: http://pastebin.com/Kf3C9E0h - S
Re: Samza and sliding window
Any answer on how to get all the kv values and reinitialise the kv store? Had one more question on implementing sliding window. If i use a kv store like rocksdb, and I use yarn (say 3 node cluster), the job that it runs to aggregate gets distributed as well and I am guessing the aggregation numbers get skewed? Is that a right assessment? On Thu, Jul 2, 2015 at 5:47 PM, Shekar Tippur ctip...@gmail.com wrote: Also, next.getValue() or next.getKey() does not yield anything. KeyValueIteratorString, String i = store.all(); while(i.hasNext()){ Entry String, String next = i.next(); log.info(Removed Key, next.getValue()); } On Thu, Jul 2, 2015 at 5:36 PM, Shekar Tippur ctip...@gmail.com wrote: Yi, There is no exception. I want to do couple of things in the window. - Get all the keys and values and publish to another store (like graphite) as a list - Remove all entries. I can iterate thro the list later but I want to be able to get all kv values and delete all of them in an atomic operation. How do I do these operations on the kv store? - S On Thu, Jul 2, 2015 at 4:59 PM, Yi Pan nickpa...@gmail.com wrote: Hi, Shekar, Sorry I was not able to follow up w/ you in time. It is great that you have found the configure problem and made it work! As for the exception on the iterator, could you send us the log w/ the exception? Thanks! -Yi On Thu, Jul 2, 2015 at 4:36 PM, Shekar Tippur ctip...@gmail.com wrote: Yi, Looks like it is working now. There was a redundant line in the config. I am able to initialize kv store and add values. In the window code, I am unable to retrieve them and mark them as 0. Here is my window code: public void window(MessageCollector collector, TaskCoordinator coordinator) { //store.delete(appName); collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, eventsSeen)); KeyValueIteratorString, String i = store.all(); while(i.hasNext()){ Entry String, String next = i.next(); log.info(Trying to remove Key, next.getKey()); //i.remove(); } eventsSeen = 0; i.close(); } How do I retrieve the key and is there a way to remove it? i.remove throws an exception. - Shekar On Wed, Jul 1, 2015 at 7:25 PM, Shekar Tippur ctip...@gmail.com wrote: Yi, Here is my config file: http://pastebin.com/Kf3C9E0h - S
Re: Samza and sliding window
Hi, Shekar, Sorry I was not able to follow up w/ you in time. It is great that you have found the configure problem and made it work! As for the exception on the iterator, could you send us the log w/ the exception? Thanks! -Yi On Thu, Jul 2, 2015 at 4:36 PM, Shekar Tippur ctip...@gmail.com wrote: Yi, Looks like it is working now. There was a redundant line in the config. I am able to initialize kv store and add values. In the window code, I am unable to retrieve them and mark them as 0. Here is my window code: public void window(MessageCollector collector, TaskCoordinator coordinator) { //store.delete(appName); collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, eventsSeen)); KeyValueIteratorString, String i = store.all(); while(i.hasNext()){ Entry String, String next = i.next(); log.info(Trying to remove Key, next.getKey()); //i.remove(); } eventsSeen = 0; i.close(); } How do I retrieve the key and is there a way to remove it? i.remove throws an exception. - Shekar On Wed, Jul 1, 2015 at 7:25 PM, Shekar Tippur ctip...@gmail.com wrote: Yi, Here is my config file: http://pastebin.com/Kf3C9E0h - S
Re: Samza and sliding window
Yi, Looks like it is working now. There was a redundant line in the config. I am able to initialize kv store and add values. In the window code, I am unable to retrieve them and mark them as 0. Here is my window code: public void window(MessageCollector collector, TaskCoordinator coordinator) { //store.delete(appName); collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, eventsSeen)); KeyValueIteratorString, String i = store.all(); while(i.hasNext()){ Entry String, String next = i.next(); log.info(Trying to remove Key, next.getKey()); //i.remove(); } eventsSeen = 0; i.close(); } How do I retrieve the key and is there a way to remove it? i.remove throws an exception. - Shekar On Wed, Jul 1, 2015 at 7:25 PM, Shekar Tippur ctip...@gmail.com wrote: Yi, Here is my config file: http://pastebin.com/Kf3C9E0h - S
Re: Samza and sliding window
Yi, There is no exception. I want to do couple of things in the window. - Get all the keys and values and publish to another store (like graphite) as a list - Remove all entries. I can iterate thro the list later but I want to be able to get all kv values and delete all of them in an atomic operation. How do I do these operations on the kv store? - S On Thu, Jul 2, 2015 at 4:59 PM, Yi Pan nickpa...@gmail.com wrote: Hi, Shekar, Sorry I was not able to follow up w/ you in time. It is great that you have found the configure problem and made it work! As for the exception on the iterator, could you send us the log w/ the exception? Thanks! -Yi On Thu, Jul 2, 2015 at 4:36 PM, Shekar Tippur ctip...@gmail.com wrote: Yi, Looks like it is working now. There was a redundant line in the config. I am able to initialize kv store and add values. In the window code, I am unable to retrieve them and mark them as 0. Here is my window code: public void window(MessageCollector collector, TaskCoordinator coordinator) { //store.delete(appName); collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, eventsSeen)); KeyValueIteratorString, String i = store.all(); while(i.hasNext()){ Entry String, String next = i.next(); log.info(Trying to remove Key, next.getKey()); //i.remove(); } eventsSeen = 0; i.close(); } How do I retrieve the key and is there a way to remove it? i.remove throws an exception. - Shekar On Wed, Jul 1, 2015 at 7:25 PM, Shekar Tippur ctip...@gmail.com wrote: Yi, Here is my config file: http://pastebin.com/Kf3C9E0h - S
Re: Samza and sliding window
Also, next.getValue() or next.getKey() does not yield anything. KeyValueIteratorString, String i = store.all(); while(i.hasNext()){ Entry String, String next = i.next(); log.info(Removed Key, next.getValue()); } On Thu, Jul 2, 2015 at 5:36 PM, Shekar Tippur ctip...@gmail.com wrote: Yi, There is no exception. I want to do couple of things in the window. - Get all the keys and values and publish to another store (like graphite) as a list - Remove all entries. I can iterate thro the list later but I want to be able to get all kv values and delete all of them in an atomic operation. How do I do these operations on the kv store? - S On Thu, Jul 2, 2015 at 4:59 PM, Yi Pan nickpa...@gmail.com wrote: Hi, Shekar, Sorry I was not able to follow up w/ you in time. It is great that you have found the configure problem and made it work! As for the exception on the iterator, could you send us the log w/ the exception? Thanks! -Yi On Thu, Jul 2, 2015 at 4:36 PM, Shekar Tippur ctip...@gmail.com wrote: Yi, Looks like it is working now. There was a redundant line in the config. I am able to initialize kv store and add values. In the window code, I am unable to retrieve them and mark them as 0. Here is my window code: public void window(MessageCollector collector, TaskCoordinator coordinator) { //store.delete(appName); collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, eventsSeen)); KeyValueIteratorString, String i = store.all(); while(i.hasNext()){ Entry String, String next = i.next(); log.info(Trying to remove Key, next.getKey()); //i.remove(); } eventsSeen = 0; i.close(); } How do I retrieve the key and is there a way to remove it? i.remove throws an exception. - Shekar On Wed, Jul 1, 2015 at 7:25 PM, Shekar Tippur ctip...@gmail.com wrote: Yi, Here is my config file: http://pastebin.com/Kf3C9E0h - S
Re: Samza and sliding window
Yi/Milinda, I am trying to initialize a kv store. I have the following properties defined: stores.store-name.key.serde=json stores.store-name.msg.serde=json stores.store-name.changelog=argos.windowchangelog How do I define a key serde as I am getting this exception: Exception in thread main org.apache.samza.SamzaException: Must define a key serde when using key value storage. at org.apache.samza.storage.kv.BaseKeyValueStorageEngineFactory$class.getStorageEngine(BaseKeyValueStorageEngineFactory.scala:86) at org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory.getStorageEngine(RocksDbKeyValueStorageEngineFactory.scala:28) at org.apache.samza.container.SamzaContainer$$anonfun$35$$anonfun$38.apply(SamzaContainer.scala:455) at org.apache.samza.container.SamzaContainer$$anonfun$35$$anonfun$38.apply(SamzaContainer.scala:439) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.immutable.Map$Map1.foreach(Map.scala:109) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at org.apache.samza.container.SamzaContainer$$anonfun$35.apply(SamzaContainer.scala:439) at org.apache.samza.container.SamzaContainer$$anonfun$35.apply(SamzaContainer.scala:416) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.immutable.Set$Set1.foreach(Set.scala:74) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractSet.scala$collection$SetLike$$super$map(Set.scala:47) at scala.collection.SetLike$class.map(SetLike.scala:93) at scala.collection.AbstractSet.map(Set.scala:47) at org.apache.samza.container.SamzaContainer$.apply(SamzaContainer.scala:416) at org.apache.samza.job.local.ThreadJobFactory.getJob(ThreadJobFactory.scala:63) at org.apache.samza.job.JobRunner.run(JobRunner.scala:62) at org.apache.samza.job.JobRunner$.main(JobRunner.scala:37) at org.apache.samza.job.JobRunner.main(JobRunner.scala) On Mon, Jun 29, 2015 at 12:41 PM, Shekar Tippur ctip...@gmail.com wrote: Yi, My use case is more of the latter. Your explanation makes sense now. I was also looking into Milinda's wiki. She has a section for Kafka partition SimplePartitioner, which is simple enough as well. Thanks for all the inputs. Let me see what I come up with while implementing it. - Shekar On Mon, Jun 29, 2015 at 10:42 AM, Yi Pan nickpa...@gmail.com wrote: Hi, Shekar, First, I would like to clarify what you meant by sliding window: is it defined as windows with size N and advance step size of 1 (which means that windows overlap and each input message would contribute to multiple counts in different windows)? Or windows with size N and advance step size of N (i.e. each incoming message only contribute to one counter in a single window)? If your use case falls into the first category, you will need something more sophisticated as discussed in SAMZA-552. If your use case is the second one, there could be a simpler version of SAMZA-552 that you can go with: 1) Initiate a KV-store that uses the application name as the key 2) For each incoming message, look for the windows that the message by the application name 3) Update the counter and update the value in the KV-store based on the application name 4) Every 5 min when window() method is triggered, set all counters to zero (this can be done in a lazy way as well, by keeping the last reset timestamp in the record in the KV-store, keyed by application name. Then, resetting counter to zero can be done when next time the application counter is updated again) Hope that makes sense. -Yi On Mon, Jun 29, 2015 at 10:06 AM, Shekar Tippur ctip...@gmail.com wrote: Benjamin, Thanks for the explanation. We dont have any specific partition scheme as yet. We just have 2 topics - raw and processed and we use default partitioning scheme. Can you share any code snippet so I can understand it better? - Shekar
Re: Samza and sliding window
Do you have serializers.registry.json.class =org.apache.samza.serializers.JsonSerdeFactory in your config file? Fang, Yan yanfang...@gmail.com On Wed, Jul 1, 2015 at 2:59 PM, Shekar Tippur ctip...@gmail.com wrote: Yi/Milinda, I am trying to initialize a kv store. I have the following properties defined: stores.store-name.key.serde=json stores.store-name.msg.serde=json stores.store-name.changelog=argos.windowchangelog How do I define a key serde as I am getting this exception: Exception in thread main org.apache.samza.SamzaException: Must define a key serde when using key value storage. at org.apache.samza.storage.kv.BaseKeyValueStorageEngineFactory$class.getStorageEngine(BaseKeyValueStorageEngineFactory.scala:86) at org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory.getStorageEngine(RocksDbKeyValueStorageEngineFactory.scala:28) at org.apache.samza.container.SamzaContainer$$anonfun$35$$anonfun$38.apply(SamzaContainer.scala:455) at org.apache.samza.container.SamzaContainer$$anonfun$35$$anonfun$38.apply(SamzaContainer.scala:439) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.immutable.Map$Map1.foreach(Map.scala:109) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at org.apache.samza.container.SamzaContainer$$anonfun$35.apply(SamzaContainer.scala:439) at org.apache.samza.container.SamzaContainer$$anonfun$35.apply(SamzaContainer.scala:416) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.immutable.Set$Set1.foreach(Set.scala:74) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractSet.scala$collection$SetLike$$super$map(Set.scala:47) at scala.collection.SetLike$class.map(SetLike.scala:93) at scala.collection.AbstractSet.map(Set.scala:47) at org.apache.samza.container.SamzaContainer$.apply(SamzaContainer.scala:416) at org.apache.samza.job.local.ThreadJobFactory.getJob(ThreadJobFactory.scala:63) at org.apache.samza.job.JobRunner.run(JobRunner.scala:62) at org.apache.samza.job.JobRunner$.main(JobRunner.scala:37) at org.apache.samza.job.JobRunner.main(JobRunner.scala) On Mon, Jun 29, 2015 at 12:41 PM, Shekar Tippur ctip...@gmail.com wrote: Yi, My use case is more of the latter. Your explanation makes sense now. I was also looking into Milinda's wiki. She has a section for Kafka partition SimplePartitioner, which is simple enough as well. Thanks for all the inputs. Let me see what I come up with while implementing it. - Shekar On Mon, Jun 29, 2015 at 10:42 AM, Yi Pan nickpa...@gmail.com wrote: Hi, Shekar, First, I would like to clarify what you meant by sliding window: is it defined as windows with size N and advance step size of 1 (which means that windows overlap and each input message would contribute to multiple counts in different windows)? Or windows with size N and advance step size of N (i.e. each incoming message only contribute to one counter in a single window)? If your use case falls into the first category, you will need something more sophisticated as discussed in SAMZA-552. If your use case is the second one, there could be a simpler version of SAMZA-552 that you can go with: 1) Initiate a KV-store that uses the application name as the key 2) For each incoming message, look for the windows that the message by the application name 3) Update the counter and update the value in the KV-store based on the application name 4) Every 5 min when window() method is triggered, set all counters to zero (this can be done in a lazy way as well, by keeping the last reset timestamp in the record in the KV-store, keyed by application name. Then, resetting counter to zero can be done when next time the application counter is updated again) Hope that makes sense. -Yi On Mon, Jun 29, 2015 at 10:06 AM, Shekar Tippur ctip...@gmail.com wrote: Benjamin, Thanks for the explanation. We dont have any specific partition scheme as yet. We just have 2 topics - raw and processed and we use default partitioning scheme. Can you share any code snippet so I can understand it better? - Shekar
Re: Samza and sliding window
Yan, yes. I do have it. - Shekar On Wed, Jul 1, 2015 at 3:09 PM, Yan Fang yanfang...@gmail.com wrote: Do you have serializers.registry.json.class =org.apache.samza.serializers.JsonSerdeFactory in your config file? Fang, Yan yanfang...@gmail.com On Wed, Jul 1, 2015 at 2:59 PM, Shekar Tippur ctip...@gmail.com wrote: Yi/Milinda, I am trying to initialize a kv store. I have the following properties defined: stores.store-name.key.serde=json stores.store-name.msg.serde=json stores.store-name.changelog=argos.windowchangelog How do I define a key serde as I am getting this exception: Exception in thread main org.apache.samza.SamzaException: Must define a key serde when using key value storage. at org.apache.samza.storage.kv.BaseKeyValueStorageEngineFactory$class.getStorageEngine(BaseKeyValueStorageEngineFactory.scala:86) at org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory.getStorageEngine(RocksDbKeyValueStorageEngineFactory.scala:28) at org.apache.samza.container.SamzaContainer$$anonfun$35$$anonfun$38.apply(SamzaContainer.scala:455) at org.apache.samza.container.SamzaContainer$$anonfun$35$$anonfun$38.apply(SamzaContainer.scala:439) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.immutable.Map$Map1.foreach(Map.scala:109) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at org.apache.samza.container.SamzaContainer$$anonfun$35.apply(SamzaContainer.scala:439) at org.apache.samza.container.SamzaContainer$$anonfun$35.apply(SamzaContainer.scala:416) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.immutable.Set$Set1.foreach(Set.scala:74) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractSet.scala$collection$SetLike$$super$map(Set.scala:47) at scala.collection.SetLike$class.map(SetLike.scala:93) at scala.collection.AbstractSet.map(Set.scala:47) at org.apache.samza.container.SamzaContainer$.apply(SamzaContainer.scala:416) at org.apache.samza.job.local.ThreadJobFactory.getJob(ThreadJobFactory.scala:63) at org.apache.samza.job.JobRunner.run(JobRunner.scala:62) at org.apache.samza.job.JobRunner$.main(JobRunner.scala:37) at org.apache.samza.job.JobRunner.main(JobRunner.scala) On Mon, Jun 29, 2015 at 12:41 PM, Shekar Tippur ctip...@gmail.com wrote: Yi, My use case is more of the latter. Your explanation makes sense now. I was also looking into Milinda's wiki. She has a section for Kafka partition SimplePartitioner, which is simple enough as well. Thanks for all the inputs. Let me see what I come up with while implementing it. - Shekar On Mon, Jun 29, 2015 at 10:42 AM, Yi Pan nickpa...@gmail.com wrote: Hi, Shekar, First, I would like to clarify what you meant by sliding window: is it defined as windows with size N and advance step size of 1 (which means that windows overlap and each input message would contribute to multiple counts in different windows)? Or windows with size N and advance step size of N (i.e. each incoming message only contribute to one counter in a single window)? If your use case falls into the first category, you will need something more sophisticated as discussed in SAMZA-552. If your use case is the second one, there could be a simpler version of SAMZA-552 that you can go with: 1) Initiate a KV-store that uses the application name as the key 2) For each incoming message, look for the windows that the message by the application name 3) Update the counter and update the value in the KV-store based on the application name 4) Every 5 min when window() method is triggered, set all counters to zero (this can be done in a lazy way as well, by keeping the last reset timestamp in the record in the KV-store, keyed by application name. Then, resetting counter to zero can be done when next time the application counter is updated again) Hope that makes sense. -Yi On Mon, Jun 29, 2015 at 10:06 AM, Shekar Tippur ctip...@gmail.com wrote: Benjamin, Thanks for the explanation. We dont have any specific partition scheme as yet. We just have 2 topics - raw and processed and we use default partitioning scheme. Can you share any code snippet so I can understand it better? - Shekar
Re: Samza and sliding window
So do you use the store-name as the kv storage name in your StreamTask code? Fang, Yan yanfang...@gmail.com On Wed, Jul 1, 2015 at 3:41 PM, Shekar Tippur ctip...@gmail.com wrote: Yan, yes. I do have it. - Shekar On Wed, Jul 1, 2015 at 3:09 PM, Yan Fang yanfang...@gmail.com wrote: Do you have serializers.registry.json.class =org.apache.samza.serializers.JsonSerdeFactory in your config file? Fang, Yan yanfang...@gmail.com On Wed, Jul 1, 2015 at 2:59 PM, Shekar Tippur ctip...@gmail.com wrote: Yi/Milinda, I am trying to initialize a kv store. I have the following properties defined: stores.store-name.key.serde=json stores.store-name.msg.serde=json stores.store-name.changelog=argos.windowchangelog How do I define a key serde as I am getting this exception: Exception in thread main org.apache.samza.SamzaException: Must define a key serde when using key value storage. at org.apache.samza.storage.kv.BaseKeyValueStorageEngineFactory$class.getStorageEngine(BaseKeyValueStorageEngineFactory.scala:86) at org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory.getStorageEngine(RocksDbKeyValueStorageEngineFactory.scala:28) at org.apache.samza.container.SamzaContainer$$anonfun$35$$anonfun$38.apply(SamzaContainer.scala:455) at org.apache.samza.container.SamzaContainer$$anonfun$35$$anonfun$38.apply(SamzaContainer.scala:439) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.immutable.Map$Map1.foreach(Map.scala:109) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at org.apache.samza.container.SamzaContainer$$anonfun$35.apply(SamzaContainer.scala:439) at org.apache.samza.container.SamzaContainer$$anonfun$35.apply(SamzaContainer.scala:416) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.immutable.Set$Set1.foreach(Set.scala:74) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractSet.scala$collection$SetLike$$super$map(Set.scala:47) at scala.collection.SetLike$class.map(SetLike.scala:93) at scala.collection.AbstractSet.map(Set.scala:47) at org.apache.samza.container.SamzaContainer$.apply(SamzaContainer.scala:416) at org.apache.samza.job.local.ThreadJobFactory.getJob(ThreadJobFactory.scala:63) at org.apache.samza.job.JobRunner.run(JobRunner.scala:62) at org.apache.samza.job.JobRunner$.main(JobRunner.scala:37) at org.apache.samza.job.JobRunner.main(JobRunner.scala) On Mon, Jun 29, 2015 at 12:41 PM, Shekar Tippur ctip...@gmail.com wrote: Yi, My use case is more of the latter. Your explanation makes sense now. I was also looking into Milinda's wiki. She has a section for Kafka partition SimplePartitioner, which is simple enough as well. Thanks for all the inputs. Let me see what I come up with while implementing it. - Shekar On Mon, Jun 29, 2015 at 10:42 AM, Yi Pan nickpa...@gmail.com wrote: Hi, Shekar, First, I would like to clarify what you meant by sliding window: is it defined as windows with size N and advance step size of 1 (which means that windows overlap and each input message would contribute to multiple counts in different windows)? Or windows with size N and advance step size of N (i.e. each incoming message only contribute to one counter in a single window)? If your use case falls into the first category, you will need something more sophisticated as discussed in SAMZA-552. If your use case is the second one, there could be a simpler version of SAMZA-552 that you can go with: 1) Initiate a KV-store that uses the application name as the key 2) For each incoming message, look for the windows that the message by the application name 3) Update the counter and update the value in the KV-store based on the application name 4) Every 5 min when window() method is triggered, set all counters to zero (this can be done in a lazy way as well, by keeping the last reset timestamp in the record in the KV-store, keyed by application name. Then, resetting counter to zero can be done when next time the application counter is updated again) Hope that makes sense. -Yi On Mon, Jun 29, 2015 at 10:06 AM, Shekar Tippur ctip...@gmail.com wrote: Benjamin,
Re: Samza and sliding window
I do have this in init as well... public void init(Config config, TaskContext context) { store = (KeyValueStoreString, Integer) context.getStore(store); } You are right. These are primitive types but I was trying to address this exception: Exception in thread main org.apache.samza.SamzaException: Must define a key serde when using key value storage. I have changed it to stores.store.key.serde=org.apache.samza.serializers.ByteSerdeFactory Regardless, exception is persisting. - Shekar
Re: Samza and sliding window
Hi Shekar, You can use Kafka's partitioning capabilities to partition your stream based on application. That will make sure events related to a application will always ended up in same partition. With this you will have multiple applications in same partition and each partition will be mapped to a single Samza task instance (AFAIK, Samza job is devided into several task instances based on number of partitions in your topic.). Then in your Samza task implementation you should maintain windowing counts for each application. Thanks Milinda On Sun, Jun 28, 2015 at 8:48 PM, Shekar Tippur ctip...@gmail.com wrote: Milinda, I see that the document you mentioned addresses windowing but I also need to group by different applications. ApplicationCount --- A100 B40 C69 - Shekar On Fri, Jun 26, 2015 at 11:39 AM, Shekar Tippur ctip...@gmail.com wrote: Never mind. I see it here: http://samza.apache.org/learn/documentation/0.8/container/windowing.html Thanks again Milinda. - Shekar On Fri, Jun 26, 2015 at 11:39 AM, Shekar Tippur ctip...@gmail.com wrote: Thanks Milinda. Is this feature available on 0.8 version of Samza? - Shekar On Fri, Jun 26, 2015 at 11:24 AM, Milinda Pathirage mpath...@umail.iu.edu wrote: Hi Shekar, You can use Samza's local storage ( http://samza.apache.org/learn/documentation/0.9/container/state-management.html ) to keep the window state and windowing ( http://samza.apache.org/learn/documentation/0.9/container/windowing.html ) capabilities to handle the window advancement. During advancement you can update the local cache (Redis in your case). AFAIK, Samza doesn't provide any helpers or utilities to handle window state maintenance. You have to implement it on top of local storage or if you don't won't fault tolerance you can keep the state in-memory too (as long as the state fit in memory). Thanks Milinda On Fri, Jun 26, 2015 at 1:53 PM, Shekar Tippur ctip...@gmail.com wrote: Yan, *What do you mean by a local cache? Is it a db like MySQL, something likeRocksDB, or even just in-memory?* Local cache as in Redis *When you say another topic, is this the topic consumed by the same Samzajob as your 5-minutes-job, or in a separate job? What is the relationbetween the topic and the application name* We dont have a 5 min job. All we have now is a stream of events coming from a bunch of applications. All these land on a raw kafka topic. The stream data has application name. I want to create a job that takes incoming stream and group it by application name and count the number of events we get in a 5 min sliding window. - Shekar On Fri, Jun 26, 2015 at 10:29 AM, Yan Fang yanfang...@gmail.com wrote: Hi Shekar, Need a little more clarification. What do you mean by a local cache? Is it a db like MySQL, something like RocksDB, or even just in-memory? When you say another topic, is this the topic consumed by the same Samza job as your 5-minutes-job, or in a separate job? What is the relation between the topic and the application name? Thanks, Fang, Yan yanfang...@gmail.com On Fri, Jun 26, 2015 at 1:08 AM, Shekar Tippur ctip...@gmail.com wrote: Hello, My apologies if I have raised it earlier. Here is the use case: I have a stream that is partitioned based on application name. I want to be able to count hte number of events happening for that particular application in the past 5 minutes (sliding window) and update either another topic or a local cache. Is this possible via 0.9 version of Samza? If not, what is the easiest way to achieve this? - Shekar -- Milinda Pathirage PhD Student | Research Assistant School of Informatics and Computing | Data to Insight Center Indiana University twitter: milindalakmal skype: milinda.pathirage blog: http://milinda.pathirage.org -- Milinda Pathirage PhD Student | Research Assistant School of Informatics and Computing | Data to Insight Center Indiana University twitter: milindalakmal skype: milinda.pathirage blog: http://milinda.pathirage.org
Re: Samza and sliding window
Shekar, You won't be creating a partition per application. By using the application name as the partitioning key you ensure all events for a given application are consistently mapped to the same partition. Multiple applications will be mapped to each partition without any need for a priori knowledge of the set of applications. You then maintain counts per application in your own code. Since all events for an application are mapped to the same partition and only one worker will consume a given partition your counts will be complete. Make sense? b On Monday, June 29, 2015, Shekar Tippur ctip...@gmail.com wrote: Milinda, This is a stream of events where I dont know how many applications are sending events. I need to dynamically create Kafka partitions. Can you please confirm the flow: 1. New event comes in 2. Check to see if a partition exists for the application. If not create one. 3. Implement public static final SystemStream OUTPUT_STREAM = new SystemStream(kafka, Application name); 4. Apply windowing on the new topic public void window(MessageCollector collector, TaskCoordinator coordinator) { collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, eventsSeen)); eventsSeen = 0; } On Mon, Jun 29, 2015 at 7:54 AM, Milinda Pathirage mpath...@umail.iu.edu javascript:; wrote: Hi Shekar, You can use Kafka's partitioning capabilities to partition your stream based on application. That will make sure events related to a application will always ended up in same partition. With this you will have multiple applications in same partition and each partition will be mapped to a single Samza task instance (AFAIK, Samza job is devided into several task instances based on number of partitions in your topic.). Then in your Samza task implementation you should maintain windowing counts for each application. Thanks Milinda On Sun, Jun 28, 2015 at 8:48 PM, Shekar Tippur ctip...@gmail.com javascript:; wrote: Milinda, I see that the document you mentioned addresses windowing but I also need to group by different applications. ApplicationCount --- A100 B40 C69 - Shekar On Fri, Jun 26, 2015 at 11:39 AM, Shekar Tippur ctip...@gmail.com javascript:; wrote: Never mind. I see it here: http://samza.apache.org/learn/documentation/0.8/container/windowing.html Thanks again Milinda. - Shekar On Fri, Jun 26, 2015 at 11:39 AM, Shekar Tippur ctip...@gmail.com javascript:; wrote: Thanks Milinda. Is this feature available on 0.8 version of Samza? - Shekar On Fri, Jun 26, 2015 at 11:24 AM, Milinda Pathirage mpath...@umail.iu.edu javascript:; wrote: Hi Shekar, You can use Samza's local storage ( http://samza.apache.org/learn/documentation/0.9/container/state-management.html ) to keep the window state and windowing ( http://samza.apache.org/learn/documentation/0.9/container/windowing.html ) capabilities to handle the window advancement. During advancement you can update the local cache (Redis in your case). AFAIK, Samza doesn't provide any helpers or utilities to handle window state maintenance. You have to implement it on top of local storage or if you don't won't fault tolerance you can keep the state in-memory too (as long as the state fit in memory). Thanks Milinda On Fri, Jun 26, 2015 at 1:53 PM, Shekar Tippur ctip...@gmail.com javascript:; wrote: Yan, *What do you mean by a local cache? Is it a db like MySQL, something likeRocksDB, or even just in-memory?* Local cache as in Redis *When you say another topic, is this the topic consumed by the same Samzajob as your 5-minutes-job, or in a separate job? What is the relationbetween the topic and the application name* We dont have a 5 min job. All we have now is a stream of events coming from a bunch of applications. All these land on a raw kafka topic. The stream data has application name. I want to create a job that takes incoming stream and group it by application name and count the number of events we get in a 5 min sliding window. - Shekar On Fri, Jun 26, 2015 at 10:29 AM, Yan Fang yanfang...@gmail.com javascript:; wrote: Hi Shekar, Need a little more clarification. What do you mean by a local cache? Is it a db like MySQL, something like RocksDB, or even just in-memory? When you say another topic, is this the topic consumed by the same Samza job as your 5-minutes-job, or in a separate job? What is the relation between the topic and
Re: Samza and sliding window
Benjamin, Thanks for the explanation. We dont have any specific partition scheme as yet. We just have 2 topics - raw and processed and we use default partitioning scheme. Can you share any code snippet so I can understand it better? - Shekar
Re: Samza and sliding window
Hi, Shekar, First, I would like to clarify what you meant by sliding window: is it defined as windows with size N and advance step size of 1 (which means that windows overlap and each input message would contribute to multiple counts in different windows)? Or windows with size N and advance step size of N (i.e. each incoming message only contribute to one counter in a single window)? If your use case falls into the first category, you will need something more sophisticated as discussed in SAMZA-552. If your use case is the second one, there could be a simpler version of SAMZA-552 that you can go with: 1) Initiate a KV-store that uses the application name as the key 2) For each incoming message, look for the windows that the message by the application name 3) Update the counter and update the value in the KV-store based on the application name 4) Every 5 min when window() method is triggered, set all counters to zero (this can be done in a lazy way as well, by keeping the last reset timestamp in the record in the KV-store, keyed by application name. Then, resetting counter to zero can be done when next time the application counter is updated again) Hope that makes sense. -Yi On Mon, Jun 29, 2015 at 10:06 AM, Shekar Tippur ctip...@gmail.com wrote: Benjamin, Thanks for the explanation. We dont have any specific partition scheme as yet. We just have 2 topics - raw and processed and we use default partitioning scheme. Can you share any code snippet so I can understand it better? - Shekar
Re: Samza and sliding window
Hi Shekar, Please have a look at [1]. Milinda [1] https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+Producer+Example On Mon, Jun 29, 2015 at 1:06 PM, Shekar Tippur ctip...@gmail.com wrote: Benjamin, Thanks for the explanation. We dont have any specific partition scheme as yet. We just have 2 topics - raw and processed and we use default partitioning scheme. Can you share any code snippet so I can understand it better? - Shekar -- Milinda Pathirage PhD Student | Research Assistant School of Informatics and Computing | Data to Insight Center Indiana University twitter: milindalakmal skype: milinda.pathirage blog: http://milinda.pathirage.org
Re: Samza and sliding window
Yi, My use case is more of the latter. Your explanation makes sense now. I was also looking into Milinda's wiki. She has a section for Kafka partition SimplePartitioner, which is simple enough as well. Thanks for all the inputs. Let me see what I come up with while implementing it. - Shekar On Mon, Jun 29, 2015 at 10:42 AM, Yi Pan nickpa...@gmail.com wrote: Hi, Shekar, First, I would like to clarify what you meant by sliding window: is it defined as windows with size N and advance step size of 1 (which means that windows overlap and each input message would contribute to multiple counts in different windows)? Or windows with size N and advance step size of N (i.e. each incoming message only contribute to one counter in a single window)? If your use case falls into the first category, you will need something more sophisticated as discussed in SAMZA-552. If your use case is the second one, there could be a simpler version of SAMZA-552 that you can go with: 1) Initiate a KV-store that uses the application name as the key 2) For each incoming message, look for the windows that the message by the application name 3) Update the counter and update the value in the KV-store based on the application name 4) Every 5 min when window() method is triggered, set all counters to zero (this can be done in a lazy way as well, by keeping the last reset timestamp in the record in the KV-store, keyed by application name. Then, resetting counter to zero can be done when next time the application counter is updated again) Hope that makes sense. -Yi On Mon, Jun 29, 2015 at 10:06 AM, Shekar Tippur ctip...@gmail.com wrote: Benjamin, Thanks for the explanation. We dont have any specific partition scheme as yet. We just have 2 topics - raw and processed and we use default partitioning scheme. Can you share any code snippet so I can understand it better? - Shekar
Re: Samza and sliding window
Milinda, I see that the document you mentioned addresses windowing but I also need to group by different applications. ApplicationCount --- A100 B40 C69 - Shekar On Fri, Jun 26, 2015 at 11:39 AM, Shekar Tippur ctip...@gmail.com wrote: Never mind. I see it here: http://samza.apache.org/learn/documentation/0.8/container/windowing.html Thanks again Milinda. - Shekar On Fri, Jun 26, 2015 at 11:39 AM, Shekar Tippur ctip...@gmail.com wrote: Thanks Milinda. Is this feature available on 0.8 version of Samza? - Shekar On Fri, Jun 26, 2015 at 11:24 AM, Milinda Pathirage mpath...@umail.iu.edu wrote: Hi Shekar, You can use Samza's local storage ( http://samza.apache.org/learn/documentation/0.9/container/state-management.html ) to keep the window state and windowing ( http://samza.apache.org/learn/documentation/0.9/container/windowing.html ) capabilities to handle the window advancement. During advancement you can update the local cache (Redis in your case). AFAIK, Samza doesn't provide any helpers or utilities to handle window state maintenance. You have to implement it on top of local storage or if you don't won't fault tolerance you can keep the state in-memory too (as long as the state fit in memory). Thanks Milinda On Fri, Jun 26, 2015 at 1:53 PM, Shekar Tippur ctip...@gmail.com wrote: Yan, *What do you mean by a local cache? Is it a db like MySQL, something likeRocksDB, or even just in-memory?* Local cache as in Redis *When you say another topic, is this the topic consumed by the same Samzajob as your 5-minutes-job, or in a separate job? What is the relationbetween the topic and the application name* We dont have a 5 min job. All we have now is a stream of events coming from a bunch of applications. All these land on a raw kafka topic. The stream data has application name. I want to create a job that takes incoming stream and group it by application name and count the number of events we get in a 5 min sliding window. - Shekar On Fri, Jun 26, 2015 at 10:29 AM, Yan Fang yanfang...@gmail.com wrote: Hi Shekar, Need a little more clarification. What do you mean by a local cache? Is it a db like MySQL, something like RocksDB, or even just in-memory? When you say another topic, is this the topic consumed by the same Samza job as your 5-minutes-job, or in a separate job? What is the relation between the topic and the application name? Thanks, Fang, Yan yanfang...@gmail.com On Fri, Jun 26, 2015 at 1:08 AM, Shekar Tippur ctip...@gmail.com wrote: Hello, My apologies if I have raised it earlier. Here is the use case: I have a stream that is partitioned based on application name. I want to be able to count hte number of events happening for that particular application in the past 5 minutes (sliding window) and update either another topic or a local cache. Is this possible via 0.9 version of Samza? If not, what is the easiest way to achieve this? - Shekar -- Milinda Pathirage PhD Student | Research Assistant School of Informatics and Computing | Data to Insight Center Indiana University twitter: milindalakmal skype: milinda.pathirage blog: http://milinda.pathirage.org
Samza and sliding window
Hello, My apologies if I have raised it earlier. Here is the use case: I have a stream that is partitioned based on application name. I want to be able to count hte number of events happening for that particular application in the past 5 minutes (sliding window) and update either another topic or a local cache. Is this possible via 0.9 version of Samza? If not, what is the easiest way to achieve this? - Shekar
Re: Samza and sliding window
Never mind. I see it here: http://samza.apache.org/learn/documentation/0.8/container/windowing.html Thanks again Milinda. - Shekar On Fri, Jun 26, 2015 at 11:39 AM, Shekar Tippur ctip...@gmail.com wrote: Thanks Milinda. Is this feature available on 0.8 version of Samza? - Shekar On Fri, Jun 26, 2015 at 11:24 AM, Milinda Pathirage mpath...@umail.iu.edu wrote: Hi Shekar, You can use Samza's local storage ( http://samza.apache.org/learn/documentation/0.9/container/state-management.html ) to keep the window state and windowing ( http://samza.apache.org/learn/documentation/0.9/container/windowing.html) capabilities to handle the window advancement. During advancement you can update the local cache (Redis in your case). AFAIK, Samza doesn't provide any helpers or utilities to handle window state maintenance. You have to implement it on top of local storage or if you don't won't fault tolerance you can keep the state in-memory too (as long as the state fit in memory). Thanks Milinda On Fri, Jun 26, 2015 at 1:53 PM, Shekar Tippur ctip...@gmail.com wrote: Yan, *What do you mean by a local cache? Is it a db like MySQL, something likeRocksDB, or even just in-memory?* Local cache as in Redis *When you say another topic, is this the topic consumed by the same Samzajob as your 5-minutes-job, or in a separate job? What is the relationbetween the topic and the application name* We dont have a 5 min job. All we have now is a stream of events coming from a bunch of applications. All these land on a raw kafka topic. The stream data has application name. I want to create a job that takes incoming stream and group it by application name and count the number of events we get in a 5 min sliding window. - Shekar On Fri, Jun 26, 2015 at 10:29 AM, Yan Fang yanfang...@gmail.com wrote: Hi Shekar, Need a little more clarification. What do you mean by a local cache? Is it a db like MySQL, something like RocksDB, or even just in-memory? When you say another topic, is this the topic consumed by the same Samza job as your 5-minutes-job, or in a separate job? What is the relation between the topic and the application name? Thanks, Fang, Yan yanfang...@gmail.com On Fri, Jun 26, 2015 at 1:08 AM, Shekar Tippur ctip...@gmail.com wrote: Hello, My apologies if I have raised it earlier. Here is the use case: I have a stream that is partitioned based on application name. I want to be able to count hte number of events happening for that particular application in the past 5 minutes (sliding window) and update either another topic or a local cache. Is this possible via 0.9 version of Samza? If not, what is the easiest way to achieve this? - Shekar -- Milinda Pathirage PhD Student | Research Assistant School of Informatics and Computing | Data to Insight Center Indiana University twitter: milindalakmal skype: milinda.pathirage blog: http://milinda.pathirage.org
Re: Samza and sliding window
Thanks Milinda. Is this feature available on 0.8 version of Samza? - Shekar On Fri, Jun 26, 2015 at 11:24 AM, Milinda Pathirage mpath...@umail.iu.edu wrote: Hi Shekar, You can use Samza's local storage ( http://samza.apache.org/learn/documentation/0.9/container/state-management.html ) to keep the window state and windowing ( http://samza.apache.org/learn/documentation/0.9/container/windowing.html) capabilities to handle the window advancement. During advancement you can update the local cache (Redis in your case). AFAIK, Samza doesn't provide any helpers or utilities to handle window state maintenance. You have to implement it on top of local storage or if you don't won't fault tolerance you can keep the state in-memory too (as long as the state fit in memory). Thanks Milinda On Fri, Jun 26, 2015 at 1:53 PM, Shekar Tippur ctip...@gmail.com wrote: Yan, *What do you mean by a local cache? Is it a db like MySQL, something likeRocksDB, or even just in-memory?* Local cache as in Redis *When you say another topic, is this the topic consumed by the same Samzajob as your 5-minutes-job, or in a separate job? What is the relationbetween the topic and the application name* We dont have a 5 min job. All we have now is a stream of events coming from a bunch of applications. All these land on a raw kafka topic. The stream data has application name. I want to create a job that takes incoming stream and group it by application name and count the number of events we get in a 5 min sliding window. - Shekar On Fri, Jun 26, 2015 at 10:29 AM, Yan Fang yanfang...@gmail.com wrote: Hi Shekar, Need a little more clarification. What do you mean by a local cache? Is it a db like MySQL, something like RocksDB, or even just in-memory? When you say another topic, is this the topic consumed by the same Samza job as your 5-minutes-job, or in a separate job? What is the relation between the topic and the application name? Thanks, Fang, Yan yanfang...@gmail.com On Fri, Jun 26, 2015 at 1:08 AM, Shekar Tippur ctip...@gmail.com wrote: Hello, My apologies if I have raised it earlier. Here is the use case: I have a stream that is partitioned based on application name. I want to be able to count hte number of events happening for that particular application in the past 5 minutes (sliding window) and update either another topic or a local cache. Is this possible via 0.9 version of Samza? If not, what is the easiest way to achieve this? - Shekar -- Milinda Pathirage PhD Student | Research Assistant School of Informatics and Computing | Data to Insight Center Indiana University twitter: milindalakmal skype: milinda.pathirage blog: http://milinda.pathirage.org