Mikhail Cherkasov created IGNITE-7523: -----------------------------------------
Summary: Exception on data expiration after sharedRDD.saveValues call Key: IGNITE-7523 URL: https://issues.apache.org/jira/browse/IGNITE-7523 Project: Ignite Issue Type: Bug Components: spark Affects Versions: 2.3 Reporter: Mikhail Cherkasov Fix For: 2.5 Reproducer: package rdd_expiration; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.UUID; import java.util.concurrent.atomic.AtomicLong; import javax.cache.Cache; import javax.cache.expiry.CreatedExpiryPolicy; import javax.cache.expiry.Duration; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; import org.apache.ignite.Ignition; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.DataRegionConfiguration; import org.apache.ignite.configuration.DataStorageConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.lang.IgniteOutClosure; import org.apache.ignite.spark.JavaIgniteContext; import org.apache.ignite.spark.JavaIgniteRDD; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; import org.apache.log4j.Level; import org.apache.log4j.Logger; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; import static org.apache.ignite.cache.CacheMode.PARTITIONED; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; /** * This example demonstrates how to create an JavaIgnitedRDD and share it with multiple spark workers. The goal of this * particular example is to provide the simplest code example of this logic. * <p> * This example will start Ignite in the embedded mode and will start an JavaIgniteContext on each Spark worker node. * <p> * The example can work in the standalone mode as well that can be enabled by setting JavaIgniteContext's * \{@code standalone} property to \{@code true} and running an Ignite node separately with * `examples/config/spark/example-shared-rdd.xml` config. */ public class RddExpiration { /** * Executes the example. * @param args Command line arguments, none required. */ public static void main(String args[]) throws InterruptedException { Ignite server = null; for (int i = 0; i < 4; i++) { IgniteConfiguration serverCfg = createIgniteCfg(); serverCfg.setClientMode(false); serverCfg.setIgniteInstanceName("Server" + i); server = Ignition.start(serverCfg); } server.active(true); // Spark Configuration. SparkConf sparkConf = new SparkConf() .setAppName("JavaIgniteRDDExample") .setMaster("local") .set("spark.executor.instances", "2"); // Spark context. JavaSparkContext sparkContext = new JavaSparkContext(sparkConf); // Adjust the logger to exclude the logs of no interest. Logger.getRootLogger().setLevel(Level.ERROR); Logger.getLogger("org.apache.ignite").setLevel(Level.INFO); // Creates Ignite context with specific configuration and runs Ignite in the embedded mode. JavaIgniteContext<UUID, Integer> igniteContext = new JavaIgniteContext<UUID, Integer>( sparkContext, new IgniteOutClosure<IgniteConfiguration>() { @Override public IgniteConfiguration apply() { return createIgniteCfg(); } }, true); // Create a Java Ignite RDD of Type (Int,Int) Integer Pair. JavaIgniteRDD<UUID, Integer> sharedRDD = igniteContext.<UUID, Integer>fromCache("sharedRDD"); long start = System.currentTimeMillis(); long totalLoaded = 0; while(System.currentTimeMillis() - start < 55_000) { // Define data to be stored in the Ignite RDD (cache). List<Integer> data = new ArrayList<>(20_000); for (int i = 0; i < 20_000; i++) data.add(i); // Preparing a Java RDD. JavaRDD<Integer> javaRDD = sparkContext.<Integer>parallelize(data); sharedRDD.saveValues(javaRDD); totalLoaded += 20_000; } System.out.println("Loaded " + totalLoaded); for (;;) { System.out.println(">>> Iterating over Ignite Shared RDD..."); IgniteCache<Object, Object> cache = server.getOrCreateCache("sharedRDD"); AtomicLong recordsLeft = new AtomicLong(0); for (Cache.Entry<Object, Object> entry : cache) { recordsLeft.incrementAndGet(); } System.out.println("Left: " + recordsLeft.get()); } // Close IgniteContext on all the workers. // igniteContext.close(true); } private static IgniteConfiguration createIgniteCfg() { IgniteConfiguration cfg = new IgniteConfiguration(); cfg.setClientMode(true); DataStorageConfiguration memCfg = new DataStorageConfiguration() .setDefaultDataRegionConfiguration( new DataRegionConfiguration() .setCheckpointPageBufferSize(16 * 1024 * 1024) .setMaxSize(8 * 16 * 1024 * 1024) .setPersistenceEnabled(true)); cfg.setDataStorageConfiguration(memCfg); TcpDiscoveryVmIpFinder finder = new TcpDiscoveryVmIpFinder(false); finder.setAddresses(Arrays.asList("localhost:47500..47600")); cfg.setDiscoverySpi( new TcpDiscoverySpi().setIpFinder(finder)); CacheConfiguration<Object, Object> cacheCfg = new CacheConfiguration<>("sharedRDD"); cacheCfg.setAtomicityMode(ATOMIC); cacheCfg.setCacheMode(PARTITIONED); cacheCfg.setBackups(1); cacheCfg.setWriteSynchronizationMode(FULL_SYNC); cacheCfg.setEagerTtl(true); cacheCfg.setExpiryPolicyFactory(CreatedExpiryPolicy.factoryOf(Duration.ONE_MINUTE)); cfg.setCacheConfiguration(cacheCfg); return cfg; } } -- This message was sent by Atlassian JIRA (v7.6.3#76005)