Here is a reproducer for this btw. Run the mainclass with program argument READER and again with argument WRITER. In the console for WRITER press a key (this will generate an A and 100 associated Bs) READER subscribes to A and gets the associated B's with a scan query. However, it takes some number of retries before all 100 arrive.
package com.testproject.server; import java.util.Arrays; import java.util.List; import java.util.Scanner; import javax.cache.Cache.Entry; import javax.cache.CacheException; import javax.cache.event.CacheEntryEvent; import javax.cache.event.CacheEntryListenerException; import javax.cache.event.CacheEntryUpdatedListener; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; import org.apache.ignite.Ignition; import org.apache.ignite.binary.BinaryObject; import org.apache.ignite.binary.BinaryObjectBuilder; import org.apache.ignite.cache.CacheAtomicityMode; import org.apache.ignite.cache.CacheMode; import org.apache.ignite.cache.CacheRebalanceMode; import org.apache.ignite.cache.CacheWriteSynchronizationMode; import org.apache.ignite.cache.query.ContinuousQuery; import org.apache.ignite.cache.query.QueryCursor; import org.apache.ignite.cache.query.ScanQuery; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.lang.IgniteAsyncCallback; import org.apache.ignite.lang.IgniteBiPredicate; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; import org.apache.ignite.transactions.Transaction; import org.apache.ignite.transactions.TransactionConcurrency; import org.apache.ignite.transactions.TransactionIsolation; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class TransactionProblem{ private static final Logger LOGGER = LoggerFactory.getLogger(TransactionProblem.class); private static class TestIgniteConfiguration extends IgniteConfiguration { public TestIgniteConfiguration(String name){ setWorkDirectory("c:\\data\\testproject\\"+name); TcpDiscoveryVmIpFinder tcpPortConfig = new TcpDiscoveryVmIpFinder(); tcpPortConfig.setAddresses(Arrays.asList("localhost:47500", "localhost:47501")); TcpDiscoverySpi discoverySpi = new TcpDiscoverySpi(); discoverySpi.setIpFinder(tcpPortConfig); setDiscoverySpi(discoverySpi); setPeerClassLoadingEnabled(true); } } private static class TestCacheConfiguration extends CacheConfiguration { public TestCacheConfiguration(String name){ super(name); setRebalanceMode(CacheRebalanceMode.SYNC); setCacheMode(CacheMode.REPLICATED); setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL); } } @IgniteAsyncCallback private static class ACallback implements CacheEntryUpdatedListener<BinaryObject, BinaryObject> { private final Ignite ignite; public ACallback(Ignite ignite) { this.ignite = ignite; } @Override public void onUpdated( Iterable<CacheEntryEvent<? extends BinaryObject, ? extends BinaryObject>> cacheEntryEvents) throws CacheEntryListenerException { cacheEntryEvents.forEach(e -> { LOGGER.info("Continuous update: {}", e); BinaryObject b = e.getValue(); long id = b.field("ID"); LOGGER.info("ID is {}", id); // find the B's for this A // keep retrying until 100 are seen int count=0; long start = System.currentTimeMillis(); while(count<100){ count = printBs(id); } long end = System.currentTimeMillis(); LOGGER.info("Took {} ms to receive all B's", (end-start)); } ); } private int printBs(long id) { IgniteCache cacheB = ignite.cache("B").withKeepBinary(); ScanQuery<String, BinaryObject> scanQuery = new ScanQuery<>( (IgniteBiPredicate<String, BinaryObject>) (key, value) -> value .field("PARENT_ID").equals(id)); cacheB.query(scanQuery); List<?> scanResults = cacheB.query(scanQuery).getAll(); LOGGER.debug("Received {} scan results", scanResults.size()); return scanResults.size(); } } public static void main(String[] args){ String type = args.length>0?args[0]:"BLANK"; if(!"READER".equals(type) && !"WRITER".equals(type)){ throw new UnsupportedOperationException("Unknown option "+type+". Choose one one READER or WRITER"); } Ignite ignite = Ignition.start(new TestIgniteConfiguration(type)); LOGGER.info("Node was successfully started"); IgniteCache<String, BinaryObject> cacheA = ignite.getOrCreateCache(new TestCacheConfiguration("A")).withKeepBinary(); IgniteCache<String, BinaryObject> cacheB = ignite.getOrCreateCache(new TestCacheConfiguration("B")).withKeepBinary(); if("WRITER".equals(type)){ // generate A and 100 associated B's. Write them all in one transaction Scanner scanner = new Scanner(System.in); while (true) { long id=System.currentTimeMillis(); LOGGER.info("Press a key to generate an A and 100 B's with join ID "+id); scanner.nextLine(); BinaryObjectBuilder aBuilder = ignite.binary().builder("A"); aBuilder.setField("ID", id); // begin a transaction try (Transaction tx = ignite.transactions().txStart( TransactionConcurrency.PESSIMISTIC, TransactionIsolation.READ_COMMITTED, 30000, 101)) { try { // insert an A record with ID cacheA.put("ID_" + id, aBuilder.build()); // insert 100 B records with this PARENT_ID BinaryObjectBuilder bBuilder = ignite.binary().builder("B"); bBuilder.setField("PARENT_ID", id); for (int i = 0; i < 100; i++) { bBuilder.setField("B_ID", i); cacheB.put("ID_" + id + "_B_" + i, bBuilder.build()); } tx.commit(); LOGGER.info("COMMITTED"); } catch (CacheException e) { tx.rollback(); } } // end transaction } } else{ // subscribe to A's and print associated B's ContinuousQuery<BinaryObject, BinaryObject> query = new ContinuousQuery<>(); query.setInitialQuery(new ScanQuery()); query.setLocalListener(new ACallback(ignite)); QueryCursor<Entry<BinaryObject, BinaryObject>> cur = cacheA.query(query); cur.forEach(entry -> { LOGGER.info("Initial record: {}", entry); }); } } } -- Sent from: http://apache-ignite-users.70518.x6.nabble.com/