Here is a reproducer for this btw.

Run the mainclass with program argument READER and again with argument
WRITER.
In the console for WRITER press a key (this will generate an A and 100
associated Bs)
READER subscribes to A and gets the associated B's with a scan query.
However, it takes some number of retries before all 100 arrive.

package com.testproject.server;

import java.util.Arrays;
import java.util.List;
import java.util.Scanner;
import javax.cache.Cache.Entry;
import javax.cache.CacheException;
import javax.cache.event.CacheEntryEvent;
import javax.cache.event.CacheEntryListenerException;
import javax.cache.event.CacheEntryUpdatedListener;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.Ignition;
import org.apache.ignite.binary.BinaryObject;
import org.apache.ignite.binary.BinaryObjectBuilder;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.CacheRebalanceMode;
import org.apache.ignite.cache.CacheWriteSynchronizationMode;
import org.apache.ignite.cache.query.ContinuousQuery;
import org.apache.ignite.cache.query.QueryCursor;
import org.apache.ignite.cache.query.ScanQuery;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.lang.IgniteAsyncCallback;
import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import
org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
import org.apache.ignite.transactions.Transaction;
import org.apache.ignite.transactions.TransactionConcurrency;
import org.apache.ignite.transactions.TransactionIsolation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TransactionProblem{

    private static final Logger LOGGER =
LoggerFactory.getLogger(TransactionProblem.class);

    private static class TestIgniteConfiguration extends IgniteConfiguration
{

        public TestIgniteConfiguration(String name){
            setWorkDirectory("c:\\data\\testproject\\"+name);
            TcpDiscoveryVmIpFinder tcpPortConfig = new
TcpDiscoveryVmIpFinder();
            tcpPortConfig.setAddresses(Arrays.asList("localhost:47500",
"localhost:47501"));
            TcpDiscoverySpi discoverySpi = new TcpDiscoverySpi();
            discoverySpi.setIpFinder(tcpPortConfig);
            setDiscoverySpi(discoverySpi);
            setPeerClassLoadingEnabled(true);
        }
    }

    private static class TestCacheConfiguration extends CacheConfiguration {
        public TestCacheConfiguration(String name){
            super(name);
            setRebalanceMode(CacheRebalanceMode.SYNC);
            setCacheMode(CacheMode.REPLICATED);
           
setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
            setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
        }
    }

    @IgniteAsyncCallback
    private static class ACallback implements
        CacheEntryUpdatedListener<BinaryObject, BinaryObject> {

        private final Ignite ignite;

        public ACallback(Ignite ignite) {
            this.ignite = ignite;
        }

        @Override
        public void onUpdated(
            Iterable<CacheEntryEvent&lt;? extends BinaryObject, ? extends
BinaryObject>> cacheEntryEvents)
            throws CacheEntryListenerException {

            cacheEntryEvents.forEach(e -> {
                    LOGGER.info("Continuous update: {}", e);
                    BinaryObject b = e.getValue();
                    long id = b.field("ID");
                    LOGGER.info("ID is {}", id);
                    // find the B's for this A
                    // keep retrying until 100 are seen
                    int count=0;
                    long start = System.currentTimeMillis();
                    while(count<100){
                        count = printBs(id);
                    }
                    long end = System.currentTimeMillis();
                    LOGGER.info("Took {} ms to receive all B's",
(end-start));
                }
            );
        }

        private int printBs(long id) {
            IgniteCache cacheB = ignite.cache("B").withKeepBinary();

            ScanQuery<String, BinaryObject> scanQuery = new ScanQuery<>(
                (IgniteBiPredicate<String, BinaryObject>) (key, value) ->
value
                    .field("PARENT_ID").equals(id));

            cacheB.query(scanQuery);
            List<?> scanResults = cacheB.query(scanQuery).getAll();
            LOGGER.debug("Received {} scan results", scanResults.size());
            return scanResults.size();
        }
    }


    public static void main(String[] args){
        String type = args.length>0?args[0]:"BLANK";
        if(!"READER".equals(type) && !"WRITER".equals(type)){
            throw new UnsupportedOperationException("Unknown option
"+type+". Choose one one READER or WRITER");
        }

        Ignite ignite = Ignition.start(new TestIgniteConfiguration(type));

        LOGGER.info("Node was successfully started");

        IgniteCache<String, BinaryObject> cacheA =
ignite.getOrCreateCache(new TestCacheConfiguration("A")).withKeepBinary();
        IgniteCache<String, BinaryObject> cacheB =
ignite.getOrCreateCache(new TestCacheConfiguration("B")).withKeepBinary();

        if("WRITER".equals(type)){
            // generate A and 100 associated B's. Write them all in one
transaction
            Scanner scanner = new Scanner(System.in);
            while (true) {
                long id=System.currentTimeMillis();
                LOGGER.info("Press a key to generate an A and 100 B's with
join ID "+id);
                scanner.nextLine();

                BinaryObjectBuilder aBuilder = ignite.binary().builder("A");
                aBuilder.setField("ID", id);

                // begin a transaction
                try (Transaction tx = ignite.transactions().txStart(
                    TransactionConcurrency.PESSIMISTIC,
                    TransactionIsolation.READ_COMMITTED, 30000,
                    101)) {

                    try {
                        // insert an A record with ID
                        cacheA.put("ID_" + id, aBuilder.build());
                        // insert 100 B records with this PARENT_ID
                        BinaryObjectBuilder bBuilder =
ignite.binary().builder("B");
                        bBuilder.setField("PARENT_ID", id);
                        for (int i = 0; i < 100; i++) {
                            bBuilder.setField("B_ID", i);
                            cacheB.put("ID_" + id + "_B_" + i,
bBuilder.build());
                        }
                        tx.commit();
                        LOGGER.info("COMMITTED");
                    } catch (CacheException e) {
                        tx.rollback();
                    }
                }
                // end transaction
            }
        }
        else{
            // subscribe to A's and print associated B's
            ContinuousQuery<BinaryObject, BinaryObject> query = new
ContinuousQuery<>();
            query.setInitialQuery(new ScanQuery());
            query.setLocalListener(new ACallback(ignite));

            QueryCursor<Entry&lt;BinaryObject, BinaryObject>> cur =
cacheA.query(query);
            cur.forEach(entry -> {
                LOGGER.info("Initial record: {}", entry);
            });
        }

    }

}




--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/

Reply via email to