Hi,
Thanks for the reply,
I have tried the same code and got an error
message org.apache.ignite.IgniteCheckedException: Failed to finish
operation (too many remaps): 32
tried in Ignite 2.10.0 and 2.13.0 both are having same problem.

Regards,
Charlin



On Tue, 31 May 2022 at 16:27, Pavel Tupitsyn <ptupit...@apache.org> wrote:

> If you run multiple nodes in the cluster, the receiver may be invoked on
> another node, so the breakpoint is not reached.
> I've simplified yor code a bit and it works as expected:
> https://gist.github.com/ptupitsyn/67c984e8ea44da6e2a42efdfc38df53c
>
> On Mon, May 30, 2022 at 11:22 AM Charlin S <charli...@hotelhub.com> wrote:
>
>> Hi,
>> Thanks for the reply,
>> First option working for me by creating a cache instance with expiry
>> policy just before datastreamer.
>> My curiosity with datastreamer and receiver also.
>>
>> no build error with new changes, but application not working as expected.
>> added breakpoint in MyStreamReceiver but not reached
>>
>> using (var cacheDataStreamer =
>> DynamicIgniteInstance.Instance.InstanceObject.GetDataStreamer<string,
>> T>(cacheName))
>>                 {
>>                     cacheDataStreamer.AllowOverwrite = true;
>>                     cacheDataStreamer.Receiver = new
>> MyStreamReceiver<T>();
>>                     foreach (var item in data)
>>                     {
>>                         string cacheKey = item.Key;
>>                         int index = cacheKey.IndexOf("Model:");
>>                         if (index > 0)
>>                             cacheKey = cacheKey.Insert(index +
>> "Model:".Length, CacheKeyDefault);
>>                         else
>>                             cacheKey = CacheKeyDefault + cacheKey;
>>                          cacheDataStreamer.AddData(cacheName + ":" +
>> cacheKey, item.Value);
>>
>>
>>                     }
>>                     cacheDataStreamer.Flush();
>>                 }
>>
>> public  class MyStreamReceiver<T> : IStreamReceiver<string, T>
>>     {
>>         public void Receive(ICache<string, T> cache,
>> ICollection<ICacheEntry<string, T>> entries)
>>         {
>>             foreach (var entry in entries)
>>             {
>>                 cache.WithExpiryPolicy(new
>> ExpiryPolicy(TimeSpan.FromSeconds(600), null, null)).Put(entry.Key,
>> entry.Value);
>>             }
>>         }
>>     }
>>
>> Regards,
>> Charlin
>>
>>
>> On Thu, 26 May 2022 at 20:17, Pavel Tupitsyn <ptupit...@apache.org>
>> wrote:
>>
>>> 1. You can set expiry policy in CacheConfiguration so that entries
>>> inserted with DataStreamer are also affected,
>>> see
>>> https://stackoverflow.com/questions/63463142/apache-ignite-net-getdatastreamer-withexpirypolicy
>>>
>>> 2. Compiler error says it all. Generic arguments don't match.
>>> Try changing
>>> MyStreamReceiver : IStreamReceiver<string, object>
>>> to
>>> MyStreamReceiver<T> : IStreamReceiver<string, T>
>>>
>>> On Thu, May 26, 2022 at 5:24 PM Charlin S <charli...@hotelhub.com>
>>> wrote:
>>>
>>>> We have a requirement to set data to expire after some time.
>>>> I set the WithExpiryPolicy for cache instance, but the data added by
>>>> GetDataStreamer does not expire, due to it returning a new instance with
>>>> default policies.
>>>> So I am trying to use IStreamReceiver but not able to build the
>>>> solution.
>>>>
>>>> IStreamReceiver  Code:
>>>>  public  class MyStreamReceiver : IStreamReceiver<string, object>
>>>>     {
>>>>         public void Receive(ICache<string, object> cache,
>>>> ICollection<ICacheEntry<string, object>> entries)
>>>>         {
>>>>             foreach (var entry in entries)
>>>>             {
>>>>                 cache.WithExpiryPolicy(new
>>>> ExpiryPolicy(TimeSpan.FromSeconds(600), null, null)).Put(entry.Key,
>>>> entry.Value);
>>>>             }
>>>>         }
>>>>     }
>>>>
>>>> Datastreamer code error
>>>> [image: image.png]
>>>>
>>>> How to implement IStreamReceiver. Please help me on this.
>>>> Regards,
>>>> Charlin
>>>>
>>>>
>>>>
>>>>

Reply via email to