Hi drosso,

Luckily there is no mystery. By the way, what version of Ignite do you use?

The clue to strange behavior here is topology change during your test
execution. As I see, Putter node is a server data node as well, so it will
hold some data partitions on it and consequently will receive some
OBJECT_PUT events. The second seeming strange thing here is observing
events for same keys on different nodes. It is explained by so-called "late
affinity assignment". Putter enters cluster and some partitions are loaded
to it from other nodes. But Putter is usable before all data is actually
loaded, instead of waiting data and freezing cluster for possibly long time
Ignite creates temporary backup partition on Putter node and primary
partition is kept on one of ServerNodes from your example (and when all
data is loaded by Putter from other nodes partitions on it will be
considered primary and previous primary partitions on other nodes will be
destroyed). Events like OBJECT_PUT are fired on backup partitions as well.
And it explains why you observe events for same keys on different nodes. If
you make Putter non-data node for the target cache (e.g. by starting it as
a client node) then you will see events only on ServerNodes.

чт, 11 окт. 2018 г. в 11:20, drosso <dro...@inwind.it>:

> Hi Ivan,
> thank you for your interest! here below you can find the code for the 2
> sample programs:
>
> *********** ServerNode.java **************
>
> package TestATServerMode;
>
> import javax.cache.Cache;
> import javax.cache.event.CacheEntryEvent;
> import javax.cache.event.CacheEntryUpdatedListener;
> import javax.cache.event.EventType;
>
> import org.apache.ignite.Ignite;
> import org.apache.ignite.IgniteCache;
> import org.apache.ignite.IgniteException;
> import org.apache.ignite.Ignition;
> import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
> import org.apache.ignite.cache.query.ContinuousQuery;
> import org.apache.ignite.cache.query.QueryCursor;
> import org.apache.ignite.cache.query.ScanQuery;
> import org.apache.ignite.events.*;
> import org.apache.ignite.lang.IgniteBiPredicate;
> import org.apache.ignite.lang.IgnitePredicate;
>
> import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT;
> import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_READ;
> import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_REMOVED;
>
> import java.util.UUID;
>
> /**
>  * Starts up an empty node with example compute configuration.
>  */
> public class ServerNode {
>         /**
>          * Start up an empty node with example compute configuration.
>          *
>          * @param args
>          *            Command line arguments, none required.
>          * @throws IgniteException
>          *             If failed.
>          */
>         private static final String CACHE_NAME = "MyCache";
>
>         @SuppressWarnings("deprecation")
>         public static void main(String[] args) throws IgniteException {
>                 Ignition.start("config/example-ignite.xml");
>
>                 Ignite ignite = Ignition.ignite();
>
>                 // Get an instance of named cache.
>                 final IgniteCache<Integer, String> cache =
> ignite.getOrCreateCache(CACHE_NAME);
>
>                 // Sample remote filter
>
>                 IgnitePredicate<CacheEvent> locLsnr = new
> IgnitePredicate<CacheEvent>() {
>                         @Override
>                         public boolean apply(CacheEvent evt) {
>                                 System.out.println("LOCAL cache event
> [evt=" + evt.name() + ",
> cacheName=" + evt.cacheName() + ", key="
>                                                 + evt.key() + ']');
>
>                                 return true; // Return true to continue
> listening.
>                         }
>                 };
>
>                 // Register event listener for all local task execution
> events.
>                 ignite.events().localListen(locLsnr, EVT_CACHE_OBJECT_PUT);
>
>
>         }
> }
>
>
> ************ Putter.java *************************
>
> package TestATServerMode;
>
> import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT;
>
> import java.sql.Time;
>
> import org.apache.ignite.Ignite;
> import org.apache.ignite.IgniteCache;
> import org.apache.ignite.Ignition;
> import org.apache.ignite.configuration.CacheConfiguration;
> import org.apache.ignite.events.CacheEvent;
> import org.apache.ignite.lang.IgnitePredicate;
>
>
> @SuppressWarnings("TypeMayBeWeakened")
> public class Putter {
>     /** Cache name. */
>     private static final String CACHE_NAME = "MyCache";
>
>     /**
>      * Executes example.
>      *
>      * @param args Command line arguments, none required.
>      * @throws InterruptedException
>      */
>     public static void main(String[] args) {
>
>         // Mark this cluster member as client.
>         //Ignition.setClientMode(true);
>
>         try (Ignite ignite = Ignition.start("config/example-ignite.xml")) {
>             System.out.println();
>             System.out.println(">>> Myexample started.");
>
>             CacheConfiguration<Integer, String> cfg = new
> CacheConfiguration<>();
>
>             //cfg.setCacheMode(CacheMode.REPLICATED);
>             cfg.setName(CACHE_NAME);
>             //cfg.setAtomicityMode(CacheAtomicityMode.ATOMIC);
>
>
>             IgnitePredicate<CacheEvent> lsnr = new
> IgnitePredicate<CacheEvent>() {
>                 @Override public boolean apply(CacheEvent evt) {
>                     System.out.println("Received cache event [evt=" +
> evt.name() + ", cacheName=" + evt.cacheName() +
>                         ", key=" + evt.key() + ']');
>
>                     return true; // Return true to continue listening.
>                 }
>             };
>
>             try (IgniteCache<Integer, String> cache =
> ignite.getOrCreateCache(cfg)) {
>                 if
> (ignite.cluster().forDataNodes(cache.getName()).nodes().isEmpty()) {
>                     System.out.println();
>                     System.out.println(">>> This example requires remote
> cache node nodes to be started.");
>                     System.out.println(">>> Please start at least 1 remote
> cache node.");
>                     System.out.println(">>> Refer to example's javadoc for
> details on configuration.");
>                     System.out.println();
>
>                     return;
>                 }
>
>
>                 // Register event listener for all local task execution
> events.
>                 ignite.events().localListen(lsnr, EVT_CACHE_OBJECT_PUT);
>
>                 put(cache);
>
>             }
>         }
>     }
>
>     /**
>      * Execute individual put and get.
>      *
>      * @param cache Cache.
>      */
>     private static void put(IgniteCache<Integer, String> cache) {
>
>         long startTime = System.currentTimeMillis();
>
>
>         for (int i = 1; i < 10; i++) {
>                 // Put created data entry to cache.
>                 cache.put(i, "String"+i);
>         }
>
>         long endTime =  System.currentTimeMillis();
>
>         System.out.println("Time taken (PUT) : " + (endTime-startTime) + "
> millisec");
>
>     }
> }
>
> ************** example-ignite.xml *********************
> <?xml version="1.0" encoding="UTF-8"?>
>
>
>
>
> <beans xmlns="http://www.springframework.org/schema/beans";
>        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
>        xsi:schemaLocation="http://www.springframework.org/schema/beans
>         http://www.springframework.org/schema/beans/spring-beans.xsd";>
>
>     <import resource="example-default.xml"/>
>
>     <bean parent="ignite.cfg"/>
> </beans>
>
> ************** example-default.xml ********************
> <?xml version="1.0" encoding="UTF-8"?>
>
>
>
>
> <beans xmlns="http://www.springframework.org/schema/beans";
>        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
>        xmlns:util="http://www.springframework.org/schema/util";
>        xsi:schemaLocation="
>         http://www.springframework.org/schema/beans
>         http://www.springframework.org/schema/beans/spring-beans.xsd
>         http://www.springframework.org/schema/util
>         http://www.springframework.org/schema/util/spring-util.xsd";>
>     <bean abstract="true" id="ignite.cfg"
> class="org.apache.ignite.configuration.IgniteConfiguration">
>
>         <property name="peerClassLoadingEnabled" value="true"/>
>
>         <property name="cacheConfiguration">
>             <list>
>
>                 <bean
> class="org.apache.ignite.configuration.CacheConfiguration">
>                     <property name="name" value="default"/>
>                     <property name="atomicityMode" value="ATOMIC"/>
>                     <property name="backups" value="0"/>
>                 </bean>
>
>                 <bean
> class="org.apache.ignite.configuration.CacheConfiguration">
>                     <property name="name" value="MyCache"/>
>                     <property name="cacheMode" value="PARTITIONED"/>
>                     <property name="atomicityMode" value="ATOMIC"/>
>                     <property name="backups" value="0"/>
>                     <property name="readFromBackup" value="true"/>
>                     <property name="partitionLossPolicy"
> value="READ_ONLY_SAFE"/>
>
>                 </bean>
>
>
>             </list>
>         </property>
>
>
>             <property name="userAttributes">
>                 <map>
>                     <entry key="group" value="NonPutter"/>
>                 </map>
>                 </property>
>
>
>         <property name="includeEventTypes">
>             <list>
>
>                 <util:constant
> static-field="org.apache.ignite.events.EventType.EVT_TASK_STARTED"/>
>                 <util:constant
> static-field="org.apache.ignite.events.EventType.EVT_TASK_FINISHED"/>
>                 <util:constant
> static-field="org.apache.ignite.events.EventType.EVT_TASK_FAILED"/>
>                 <util:constant
> static-field="org.apache.ignite.events.EventType.EVT_TASK_TIMEDOUT"/>
>                 <util:constant
>
> static-field="org.apache.ignite.events.EventType.EVT_TASK_SESSION_ATTR_SET"/>
>                 <util:constant
> static-field="org.apache.ignite.events.EventType.EVT_TASK_REDUCED"/>
>
>
>                 <util:constant
> static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT"/>
>                 <util:constant
> static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_READ"/>
>                 <util:constant
>
> static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_REMOVED"/>
>             </list>
>         </property>
>
>
>         <property name="discoverySpi">
>             <bean
> class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
>                 <property name="ipFinder">
>
>
>
>                     <bean
>
> class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder">
>                         <property name="addresses">
>                             <list>
>
>                                 <value>127.0.0.1:47500..47509</value>
>                             </list>
>                         </property>
>                     </bean>
>                 </property>
>             </bean>
>         </property>
>     </bean>
> </beans>
>
>
>
>
>
>
> --
> Sent from: http://apache-ignite-users.70518.x6.nabble.com/
>


-- 
Best regards,
Ivan Pavlukhin

Reply via email to