[ https://issues.apache.org/jira/browse/IGNITE-19561?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Nikita Amelchev updated IGNITE-19561: ------------------------------------- Fix Version/s: (was: 2.16) > Ignite thin client continuous query listener cannot listen to all events > ------------------------------------------------------------------------ > > Key: IGNITE-19561 > URL: https://issues.apache.org/jira/browse/IGNITE-19561 > Project: Ignite > Issue Type: Bug > Components: cache, thin client > Affects Versions: 2.15 > Environment: JDK 1.8 > Windows 10 > Reporter: Mengyu Jing > Assignee: Pavel Tupitsyn > Priority: Major > Attachments: result1.log, result2.log > > > *Problem scenario:* > Start the Ignite server of one node, start one thin client and create a > continuous query listener, and then use 50 threads to add 500 data to the > cache concurrently. > *Problem phenomenon:* > Through the information printed on the listener, it was found that the number > of events listened to each time varies, possibly 496, 498, 499 or 500... > *Test Code:* > {code:java} > import org.apache.ignite.Ignite; > import org.apache.ignite.Ignition; > import org.apache.ignite.configuration.IgniteConfiguration; > import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; > import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; > import java.util.ArrayList; > import java.util.List; > public class StartServer { > public static void main(String[] args) { > IgniteConfiguration igniteConfiguration = new IgniteConfiguration(); > TcpDiscoverySpi spi = new TcpDiscoverySpi(); > List<String> addrList = new ArrayList<>(); > addrList.add("127.0.0.1:47500"); > TcpDiscoveryVmIpFinder ipFinder = new TcpDiscoveryVmIpFinder(); > ipFinder.setAddresses(addrList); > spi.setIpFinder(ipFinder); > igniteConfiguration.setDiscoverySpi(spi); > Ignite ignite = Ignition.start(igniteConfiguration); > } > } > {code} > {code:java} > import org.apache.ignite.Ignition; > import org.apache.ignite.cache.query.ContinuousQuery; > import org.apache.ignite.client.ClientCache; > import org.apache.ignite.client.IgniteClient; > import org.apache.ignite.configuration.ClientConfiguration; > import javax.cache.event.CacheEntryEvent; > import javax.cache.event.CacheEntryListenerException; > import javax.cache.event.CacheEntryUpdatedListener; > import java.util.Iterator; > public class StartThinClient { > public static void main(String[] args) throws InterruptedException { > String addr = "127.0.0.1:10800"; > int threadNmu = 50; > ClientConfiguration clientConfiguration = new ClientConfiguration(); > clientConfiguration.setAddresses(addr); > IgniteClient client1 = Ignition.startClient(clientConfiguration); > ClientCache<Object, Object> cache1 = client1.getOrCreateCache("test"); > ContinuousQuery<Object, Object> query = new ContinuousQuery<>(); > query.setLocalListener(new CacheEntryUpdatedListener<Object, > Object>() { > @Override > public void onUpdated(Iterable<CacheEntryEvent<?, ?>> > cacheEntryEvents) throws CacheEntryListenerException { > Iterator<CacheEntryEvent<?, ?>> iterator = > cacheEntryEvents.iterator(); > while (iterator.hasNext()) { > CacheEntryEvent<?, ?> next = iterator.next(); > System.out.println("----" + next.getKey()); > } > } > }); > cache1.query(query); > IgniteClient client2 = Ignition.startClient(clientConfiguration); > ClientCache<Object, Object> cache2 = client2.cache("test"); > Thread[] threads = new Thread[threadNmu]; > for (int i = 0; i < threads.length; ++i) { > threads[i] = new Thread(new OperationInsert(cache2, i, 500, > threadNmu)); > } > for (int i = 0; i < threads.length; ++i) { > threads[i].start(); > } > for (Thread thread : threads) { > thread.join(); > } > Thread.sleep(60000); > } > static class OperationInsert implements Runnable { > private ClientCache<Object, Object> cache; > private int k; > private Integer test_rows; > private Integer thread_cnt; > public OperationInsert(ClientCache<Object, Object> cache, int k, > Integer test_rows, Integer thread_cnt) { > this.cache = cache; > this.k = k; > this.test_rows = test_rows; > this.thread_cnt = thread_cnt; > } > @Override > public void run() { > for (int i = 1000000 + (test_rows/thread_cnt) * k; i < 1000000 + > (test_rows/thread_cnt) * (k + 1); i++) { > cache.put("" + i, "aaa"); > } > } > } > } {code} > *Running results:* > *[^result1.log][^result2.log]* > *Version:* > The testing program uses Ignite version 2.15.0. > I attempted to insert data using one thread and did not observe any event > loss. In addition, I also attempted an Ignite cluster with two or three > nodes, which can still listen to all 500 events even when inserting data > using multiple threads.This problem seems to only occur when concurrent > threads insert data into a node. -- This message was sent by Atlassian Jira (v8.20.10#820010)