Hi,
I am using 3(2 server and 1 client) node cluster, why can we not run this
with multiple nodes?.
The First option is also not working now. It was working and now it's
failing.

Regards,
Charlin




On Wed, 1 Jun 2022 at 11:28, Pavel Tupitsyn <ptupit...@apache.org> wrote:

> You certainly have other nodes running somewhere, and they get picked up
> when you run examples, yielding incorrect results.
> Try adding this right after Ignition.Start:
>
> var nodeCount = ignite.GetCluster().GetNodes().Count;
> if (nodeCount > 1)
> {
>     throw new Exception("Unexpected node count: " + nodeCount);
> }
>
>
> On Tue, May 31, 2022 at 5:30 PM Charlin S <charli...@hotelhub.com> wrote:
>
>> Hi,
>> Thanks for the reply,
>> I have tried the same code and got an error
>> message org.apache.ignite.IgniteCheckedException: Failed to finish
>> operation (too many remaps): 32
>> tried in Ignite 2.10.0 and 2.13.0 both are having same problem.
>>
>> Regards,
>> Charlin
>>
>>
>>
>> On Tue, 31 May 2022 at 16:27, Pavel Tupitsyn <ptupit...@apache.org>
>> wrote:
>>
>>> If you run multiple nodes in the cluster, the receiver may be invoked on
>>> another node, so the breakpoint is not reached.
>>> I've simplified yor code a bit and it works as expected:
>>> https://gist.github.com/ptupitsyn/67c984e8ea44da6e2a42efdfc38df53c
>>>
>>> On Mon, May 30, 2022 at 11:22 AM Charlin S <charli...@hotelhub.com>
>>> wrote:
>>>
>>>> Hi,
>>>> Thanks for the reply,
>>>> First option working for me by creating a cache instance with expiry
>>>> policy just before datastreamer.
>>>> My curiosity with datastreamer and receiver also.
>>>>
>>>> no build error with new changes, but application not working as
>>>> expected. added breakpoint in MyStreamReceiver but not reached
>>>>
>>>> using (var cacheDataStreamer =
>>>> DynamicIgniteInstance.Instance.InstanceObject.GetDataStreamer<string,
>>>> T>(cacheName))
>>>>                 {
>>>>                     cacheDataStreamer.AllowOverwrite = true;
>>>>                     cacheDataStreamer.Receiver = new
>>>> MyStreamReceiver<T>();
>>>>                     foreach (var item in data)
>>>>                     {
>>>>                         string cacheKey = item.Key;
>>>>                         int index = cacheKey.IndexOf("Model:");
>>>>                         if (index > 0)
>>>>                             cacheKey = cacheKey.Insert(index +
>>>> "Model:".Length, CacheKeyDefault);
>>>>                         else
>>>>                             cacheKey = CacheKeyDefault + cacheKey;
>>>>                          cacheDataStreamer.AddData(cacheName + ":" +
>>>> cacheKey, item.Value);
>>>>
>>>>
>>>>                     }
>>>>                     cacheDataStreamer.Flush();
>>>>                 }
>>>>
>>>> public  class MyStreamReceiver<T> : IStreamReceiver<string, T>
>>>>     {
>>>>         public void Receive(ICache<string, T> cache,
>>>> ICollection<ICacheEntry<string, T>> entries)
>>>>         {
>>>>             foreach (var entry in entries)
>>>>             {
>>>>                 cache.WithExpiryPolicy(new
>>>> ExpiryPolicy(TimeSpan.FromSeconds(600), null, null)).Put(entry.Key,
>>>> entry.Value);
>>>>             }
>>>>         }
>>>>     }
>>>>
>>>> Regards,
>>>> Charlin
>>>>
>>>>
>>>> On Thu, 26 May 2022 at 20:17, Pavel Tupitsyn <ptupit...@apache.org>
>>>> wrote:
>>>>
>>>>> 1. You can set expiry policy in CacheConfiguration so that entries
>>>>> inserted with DataStreamer are also affected,
>>>>> see
>>>>> https://stackoverflow.com/questions/63463142/apache-ignite-net-getdatastreamer-withexpirypolicy
>>>>>
>>>>> 2. Compiler error says it all. Generic arguments don't match.
>>>>> Try changing
>>>>> MyStreamReceiver : IStreamReceiver<string, object>
>>>>> to
>>>>> MyStreamReceiver<T> : IStreamReceiver<string, T>
>>>>>
>>>>> On Thu, May 26, 2022 at 5:24 PM Charlin S <charli...@hotelhub.com>
>>>>> wrote:
>>>>>
>>>>>> We have a requirement to set data to expire after some time.
>>>>>> I set the WithExpiryPolicy for cache instance, but the data added by
>>>>>> GetDataStreamer does not expire, due to it returning a new instance with
>>>>>> default policies.
>>>>>> So I am trying to use IStreamReceiver but not able to build the
>>>>>> solution.
>>>>>>
>>>>>> IStreamReceiver  Code:
>>>>>>  public  class MyStreamReceiver : IStreamReceiver<string, object>
>>>>>>     {
>>>>>>         public void Receive(ICache<string, object> cache,
>>>>>> ICollection<ICacheEntry<string, object>> entries)
>>>>>>         {
>>>>>>             foreach (var entry in entries)
>>>>>>             {
>>>>>>                 cache.WithExpiryPolicy(new
>>>>>> ExpiryPolicy(TimeSpan.FromSeconds(600), null, null)).Put(entry.Key,
>>>>>> entry.Value);
>>>>>>             }
>>>>>>         }
>>>>>>     }
>>>>>>
>>>>>> Datastreamer code error
>>>>>> [image: image.png]
>>>>>>
>>>>>> How to implement IStreamReceiver. Please help me on this.
>>>>>> Regards,
>>>>>> Charlin
>>>>>>
>>>>>>
>>>>>>
>>>>>>

Reply via email to