Hi, Thanks for the reply, I have tried the same code and got an error message org.apache.ignite.IgniteCheckedException: Failed to finish operation (too many remaps): 32 tried in Ignite 2.10.0 and 2.13.0 both are having same problem.
Regards, Charlin On Tue, 31 May 2022 at 16:27, Pavel Tupitsyn <ptupit...@apache.org> wrote: > If you run multiple nodes in the cluster, the receiver may be invoked on > another node, so the breakpoint is not reached. > I've simplified yor code a bit and it works as expected: > https://gist.github.com/ptupitsyn/67c984e8ea44da6e2a42efdfc38df53c > > On Mon, May 30, 2022 at 11:22 AM Charlin S <charli...@hotelhub.com> wrote: > >> Hi, >> Thanks for the reply, >> First option working for me by creating a cache instance with expiry >> policy just before datastreamer. >> My curiosity with datastreamer and receiver also. >> >> no build error with new changes, but application not working as expected. >> added breakpoint in MyStreamReceiver but not reached >> >> using (var cacheDataStreamer = >> DynamicIgniteInstance.Instance.InstanceObject.GetDataStreamer<string, >> T>(cacheName)) >> { >> cacheDataStreamer.AllowOverwrite = true; >> cacheDataStreamer.Receiver = new >> MyStreamReceiver<T>(); >> foreach (var item in data) >> { >> string cacheKey = item.Key; >> int index = cacheKey.IndexOf("Model:"); >> if (index > 0) >> cacheKey = cacheKey.Insert(index + >> "Model:".Length, CacheKeyDefault); >> else >> cacheKey = CacheKeyDefault + cacheKey; >> cacheDataStreamer.AddData(cacheName + ":" + >> cacheKey, item.Value); >> >> >> } >> cacheDataStreamer.Flush(); >> } >> >> public class MyStreamReceiver<T> : IStreamReceiver<string, T> >> { >> public void Receive(ICache<string, T> cache, >> ICollection<ICacheEntry<string, T>> entries) >> { >> foreach (var entry in entries) >> { >> cache.WithExpiryPolicy(new >> ExpiryPolicy(TimeSpan.FromSeconds(600), null, null)).Put(entry.Key, >> entry.Value); >> } >> } >> } >> >> Regards, >> Charlin >> >> >> On Thu, 26 May 2022 at 20:17, Pavel Tupitsyn <ptupit...@apache.org> >> wrote: >> >>> 1. You can set expiry policy in CacheConfiguration so that entries >>> inserted with DataStreamer are also affected, >>> see >>> https://stackoverflow.com/questions/63463142/apache-ignite-net-getdatastreamer-withexpirypolicy >>> >>> 2. Compiler error says it all. Generic arguments don't match. >>> Try changing >>> MyStreamReceiver : IStreamReceiver<string, object> >>> to >>> MyStreamReceiver<T> : IStreamReceiver<string, T> >>> >>> On Thu, May 26, 2022 at 5:24 PM Charlin S <charli...@hotelhub.com> >>> wrote: >>> >>>> We have a requirement to set data to expire after some time. >>>> I set the WithExpiryPolicy for cache instance, but the data added by >>>> GetDataStreamer does not expire, due to it returning a new instance with >>>> default policies. >>>> So I am trying to use IStreamReceiver but not able to build the >>>> solution. >>>> >>>> IStreamReceiver Code: >>>> public class MyStreamReceiver : IStreamReceiver<string, object> >>>> { >>>> public void Receive(ICache<string, object> cache, >>>> ICollection<ICacheEntry<string, object>> entries) >>>> { >>>> foreach (var entry in entries) >>>> { >>>> cache.WithExpiryPolicy(new >>>> ExpiryPolicy(TimeSpan.FromSeconds(600), null, null)).Put(entry.Key, >>>> entry.Value); >>>> } >>>> } >>>> } >>>> >>>> Datastreamer code error >>>> [image: image.png] >>>> >>>> How to implement IStreamReceiver. Please help me on this. >>>> Regards, >>>> Charlin >>>> >>>> >>>> >>>>