[ 
https://issues.apache.org/jira/browse/IGNITE-19561?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nikita Amelchev updated IGNITE-19561:
-------------------------------------
    Fix Version/s:     (was: 2.16)

> Ignite thin client continuous query listener cannot listen to all events
> ------------------------------------------------------------------------
>
>                 Key: IGNITE-19561
>                 URL: https://issues.apache.org/jira/browse/IGNITE-19561
>             Project: Ignite
>          Issue Type: Bug
>          Components: cache, thin client
>    Affects Versions: 2.15
>         Environment: JDK 1.8 
> Windows 10
>            Reporter: Mengyu Jing
>            Assignee: Pavel Tupitsyn
>            Priority: Major
>         Attachments: result1.log, result2.log
>
>
> *Problem scenario:*
> Start the Ignite server of one node, start one thin client and create a 
> continuous query listener, and then use 50 threads to add 500 data to the 
> cache concurrently.
> *Problem phenomenon:*
> Through the information printed on the listener, it was found that the number 
> of events listened to each time varies, possibly 496, 498, 499 or 500...
> *Test Code:*
> {code:java}
> import org.apache.ignite.Ignite;
> import org.apache.ignite.Ignition;
> import org.apache.ignite.configuration.IgniteConfiguration;
> import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
> import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
> import java.util.ArrayList;
> import java.util.List;
> public class StartServer {
>     public static void main(String[] args) {
>         IgniteConfiguration igniteConfiguration = new IgniteConfiguration();
>         TcpDiscoverySpi spi = new TcpDiscoverySpi();
>         List<String> addrList = new ArrayList<>();
>         addrList.add("127.0.0.1:47500");
>         TcpDiscoveryVmIpFinder ipFinder = new TcpDiscoveryVmIpFinder();
>         ipFinder.setAddresses(addrList);
>         spi.setIpFinder(ipFinder);
>         igniteConfiguration.setDiscoverySpi(spi);
>         Ignite ignite = Ignition.start(igniteConfiguration);
>     }
> }
> {code}
> {code:java}
> import org.apache.ignite.Ignition;
> import org.apache.ignite.cache.query.ContinuousQuery;
> import org.apache.ignite.client.ClientCache;
> import org.apache.ignite.client.IgniteClient;
> import org.apache.ignite.configuration.ClientConfiguration;
> import javax.cache.event.CacheEntryEvent;
> import javax.cache.event.CacheEntryListenerException;
> import javax.cache.event.CacheEntryUpdatedListener;
> import java.util.Iterator;
> public class StartThinClient {
>     public static void main(String[] args) throws InterruptedException {
>         String addr = "127.0.0.1:10800";
>         int threadNmu = 50;
>         ClientConfiguration clientConfiguration = new ClientConfiguration();
>         clientConfiguration.setAddresses(addr);
>         IgniteClient client1 = Ignition.startClient(clientConfiguration);
>         ClientCache<Object, Object> cache1 = client1.getOrCreateCache("test");
>         ContinuousQuery<Object, Object> query = new ContinuousQuery<>();
>         query.setLocalListener(new CacheEntryUpdatedListener<Object, 
> Object>() {
>             @Override
>             public void onUpdated(Iterable<CacheEntryEvent<?, ?>> 
> cacheEntryEvents) throws CacheEntryListenerException {
>                 Iterator<CacheEntryEvent<?, ?>> iterator = 
> cacheEntryEvents.iterator();
>                 while (iterator.hasNext()) {
>                     CacheEntryEvent<?, ?> next = iterator.next();
>                     System.out.println("----" + next.getKey());
>                 }
>             }
>         });
>         cache1.query(query);
>         IgniteClient client2 = Ignition.startClient(clientConfiguration);
>         ClientCache<Object, Object> cache2 = client2.cache("test");
>         Thread[] threads = new Thread[threadNmu];
>         for (int i = 0; i < threads.length; ++i) {
>             threads[i] = new Thread(new OperationInsert(cache2, i, 500, 
> threadNmu));
>         }
>         for (int i = 0; i < threads.length; ++i) {
>             threads[i].start();
>         }
>         for (Thread thread : threads) {
>             thread.join();
>         }
>         Thread.sleep(60000);
>     }
>     static class OperationInsert implements Runnable {
>         private ClientCache<Object, Object> cache;
>         private int k;
>         private Integer test_rows;
>         private Integer thread_cnt;
>         public OperationInsert(ClientCache<Object, Object> cache, int k, 
> Integer test_rows, Integer thread_cnt) {
>             this.cache = cache;
>             this.k = k;
>             this.test_rows = test_rows;
>             this.thread_cnt = thread_cnt;
>         }
>         @Override
>         public void run() {
>             for (int i = 1000000 + (test_rows/thread_cnt) * k; i < 1000000 + 
> (test_rows/thread_cnt) * (k + 1); i++) {
>                 cache.put("" + i, "aaa");
>             }
>         }
>     }
> } {code}
> *Running results:*
> *[^result1.log][^result2.log]*
> *Version:*
> The testing program uses Ignite version 2.15.0.
> I attempted to insert data using one thread and did not observe any event 
> loss. In addition, I also attempted an Ignite cluster with two or three 
> nodes, which can still listen to all 500 events even when inserting data 
> using multiple threads.This problem seems to only occur when concurrent 
> threads insert data into a node.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to