[ https://issues.apache.org/jira/browse/IGNITE-7523?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Alexey Zinoviev updated IGNITE-7523: ------------------------------------ Affects Version/s: (was: 2.3) 2.9 > Exception on data expiration after sharedRDD.saveValues call > ------------------------------------------------------------ > > Key: IGNITE-7523 > URL: https://issues.apache.org/jira/browse/IGNITE-7523 > Project: Ignite > Issue Type: Bug > Components: spark > Affects Versions: 2.9 > Reporter: Mikhail Cherkasov > Assignee: Alexey Zinoviev > Priority: Critical > Fix For: 2.9 > > > Reproducer: > {code:java} > package rdd_expiration; > import java.util.ArrayList; > import java.util.Arrays; > import java.util.List; > import java.util.UUID; > import java.util.concurrent.atomic.AtomicLong; > import javax.cache.Cache; > import javax.cache.expiry.CreatedExpiryPolicy; > import javax.cache.expiry.Duration; > import org.apache.ignite.Ignite; > import org.apache.ignite.IgniteCache; > import org.apache.ignite.Ignition; > import org.apache.ignite.configuration.CacheConfiguration; > import org.apache.ignite.configuration.DataRegionConfiguration; > import org.apache.ignite.configuration.DataStorageConfiguration; > import org.apache.ignite.configuration.IgniteConfiguration; > import org.apache.ignite.lang.IgniteOutClosure; > import org.apache.ignite.spark.JavaIgniteContext; > import org.apache.ignite.spark.JavaIgniteRDD; > import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; > import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; > import org.apache.log4j.Level; > import org.apache.log4j.Logger; > import org.apache.spark.SparkConf; > import org.apache.spark.api.java.JavaRDD; > import org.apache.spark.api.java.JavaSparkContext; > import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; > import static org.apache.ignite.cache.CacheMode.PARTITIONED; > import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; > /** > * This example demonstrates how to create an JavaIgnitedRDD and share it with > multiple spark workers. The goal of this > * particular example is to provide the simplest code example of this logic. > * <p> > * This example will start Ignite in the embedded mode and will start an > JavaIgniteContext on each Spark worker node. > * <p> > * The example can work in the standalone mode as well that can be enabled by > setting JavaIgniteContext's > * \{@code standalone} property to \{@code true} and running an Ignite node > separately with > * `examples/config/spark/example-shared-rdd.xml` config. > */ > public class RddExpiration { > /** > * Executes the example. > * @param args Command line arguments, none required. > */ > public static void main(String args[]) throws InterruptedException { > Ignite server = null; > for (int i = 0; i < 4; i++) { > IgniteConfiguration serverCfg = createIgniteCfg(); > serverCfg.setClientMode(false); > serverCfg.setIgniteInstanceName("Server" + i); > server = Ignition.start(serverCfg); > } > server.active(true); > // Spark Configuration. > SparkConf sparkConf = new SparkConf() > .setAppName("JavaIgniteRDDExample") > .setMaster("local") > .set("spark.executor.instances", "2"); > // Spark context. > JavaSparkContext sparkContext = new JavaSparkContext(sparkConf); > // Adjust the logger to exclude the logs of no interest. > Logger.getRootLogger().setLevel(Level.ERROR); > Logger.getLogger("org.apache.ignite").setLevel(Level.INFO); > // Creates Ignite context with specific configuration and runs Ignite in the > embedded mode. > JavaIgniteContext<UUID, Integer> igniteContext = new JavaIgniteContext<UUID, > Integer>( > sparkContext, > new IgniteOutClosure<IgniteConfiguration>() { > @Override public IgniteConfiguration apply() { > return createIgniteCfg(); > } > }, > true); > // Create a Java Ignite RDD of Type (Int,Int) Integer Pair. > JavaIgniteRDD<UUID, Integer> sharedRDD = igniteContext.<UUID, > Integer>fromCache("sharedRDD"); > long start = System.currentTimeMillis(); > long totalLoaded = 0; > while(System.currentTimeMillis() - start < 55_000) { > // Define data to be stored in the Ignite RDD (cache). > List<Integer> data = new ArrayList<>(20_000); > for (int i = 0; i < 20_000; i++) > data.add(i); > // Preparing a Java RDD. > JavaRDD<Integer> javaRDD = sparkContext.<Integer>parallelize(data); > sharedRDD.saveValues(javaRDD); > totalLoaded += 20_000; > } > System.out.println("Loaded " + totalLoaded); > for (;;) { > System.out.println(">>> Iterating over Ignite Shared RDD..."); > IgniteCache<Object, Object> cache = server.getOrCreateCache("sharedRDD"); > AtomicLong recordsLeft = new AtomicLong(0); > for (Cache.Entry<Object, Object> entry : cache) { > recordsLeft.incrementAndGet(); > } > System.out.println("Left: " + recordsLeft.get()); > } > // Close IgniteContext on all the workers. > // igniteContext.close(true); > } > private static IgniteConfiguration createIgniteCfg() { > IgniteConfiguration cfg = new IgniteConfiguration(); > cfg.setClientMode(true); > DataStorageConfiguration memCfg = new DataStorageConfiguration() > .setDefaultDataRegionConfiguration( > new DataRegionConfiguration() > .setCheckpointPageBufferSize(16 * 1024 * 1024) > .setMaxSize(8 * 16 * 1024 * 1024) > .setPersistenceEnabled(true)); > cfg.setDataStorageConfiguration(memCfg); > TcpDiscoveryVmIpFinder finder = new TcpDiscoveryVmIpFinder(false); > finder.setAddresses(Arrays.asList("localhost:47500..47600")); > cfg.setDiscoverySpi( new TcpDiscoverySpi().setIpFinder(finder)); > CacheConfiguration<Object, Object> cacheCfg = new > CacheConfiguration<>("sharedRDD"); > cacheCfg.setAtomicityMode(ATOMIC); > cacheCfg.setCacheMode(PARTITIONED); > cacheCfg.setBackups(1); > cacheCfg.setWriteSynchronizationMode(FULL_SYNC); > cacheCfg.setEagerTtl(true); > cacheCfg.setExpiryPolicyFactory(CreatedExpiryPolicy.factoryOf(Duration.ONE_MINUTE)); > cfg.setCacheConfiguration(cacheCfg); > return cfg; > } > } > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)