[ 
https://issues.apache.org/jira/browse/IGNITE-7523?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alexey Zinoviev updated IGNITE-7523:
------------------------------------
    Affects Version/s:     (was: 2.3)
                       2.9

> Exception on data expiration after sharedRDD.saveValues call
> ------------------------------------------------------------
>
>                 Key: IGNITE-7523
>                 URL: https://issues.apache.org/jira/browse/IGNITE-7523
>             Project: Ignite
>          Issue Type: Bug
>          Components: spark
>    Affects Versions: 2.9
>            Reporter: Mikhail Cherkasov
>            Assignee: Alexey Zinoviev
>            Priority: Critical
>             Fix For: 2.9
>
>
> Reproducer:
> {code:java}
> package rdd_expiration;
> import java.util.ArrayList;
> import java.util.Arrays;
> import java.util.List;
> import java.util.UUID;
> import java.util.concurrent.atomic.AtomicLong;
> import javax.cache.Cache;
> import javax.cache.expiry.CreatedExpiryPolicy;
> import javax.cache.expiry.Duration;
> import org.apache.ignite.Ignite;
> import org.apache.ignite.IgniteCache;
> import org.apache.ignite.Ignition;
> import org.apache.ignite.configuration.CacheConfiguration;
> import org.apache.ignite.configuration.DataRegionConfiguration;
> import org.apache.ignite.configuration.DataStorageConfiguration;
> import org.apache.ignite.configuration.IgniteConfiguration;
> import org.apache.ignite.lang.IgniteOutClosure;
> import org.apache.ignite.spark.JavaIgniteContext;
> import org.apache.ignite.spark.JavaIgniteRDD;
> import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
> import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
> import org.apache.log4j.Level;
> import org.apache.log4j.Logger;
> import org.apache.spark.SparkConf;
> import org.apache.spark.api.java.JavaRDD;
> import org.apache.spark.api.java.JavaSparkContext;
> import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
> import static org.apache.ignite.cache.CacheMode.PARTITIONED;
> import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
> /**
> * This example demonstrates how to create an JavaIgnitedRDD and share it with 
> multiple spark workers. The goal of this
> * particular example is to provide the simplest code example of this logic.
> * <p>
> * This example will start Ignite in the embedded mode and will start an 
> JavaIgniteContext on each Spark worker node.
> * <p>
> * The example can work in the standalone mode as well that can be enabled by 
> setting JavaIgniteContext's
> * \{@code standalone} property to \{@code true} and running an Ignite node 
> separately with
> * `examples/config/spark/example-shared-rdd.xml` config.
> */
> public class RddExpiration {
> /**
> * Executes the example.
> * @param args Command line arguments, none required.
> */
> public static void main(String args[]) throws InterruptedException {
> Ignite server = null;
> for (int i = 0; i < 4; i++) {
> IgniteConfiguration serverCfg = createIgniteCfg();
> serverCfg.setClientMode(false);
> serverCfg.setIgniteInstanceName("Server" + i);
> server = Ignition.start(serverCfg);
> }
> server.active(true);
> // Spark Configuration.
> SparkConf sparkConf = new SparkConf()
> .setAppName("JavaIgniteRDDExample")
> .setMaster("local")
> .set("spark.executor.instances", "2");
> // Spark context.
> JavaSparkContext sparkContext = new JavaSparkContext(sparkConf);
> // Adjust the logger to exclude the logs of no interest.
> Logger.getRootLogger().setLevel(Level.ERROR);
> Logger.getLogger("org.apache.ignite").setLevel(Level.INFO);
> // Creates Ignite context with specific configuration and runs Ignite in the 
> embedded mode.
> JavaIgniteContext<UUID, Integer> igniteContext = new JavaIgniteContext<UUID, 
> Integer>(
> sparkContext,
> new IgniteOutClosure<IgniteConfiguration>() {
> @Override public IgniteConfiguration apply() {
> return createIgniteCfg();
> }
> },
> true);
> // Create a Java Ignite RDD of Type (Int,Int) Integer Pair.
> JavaIgniteRDD<UUID, Integer> sharedRDD = igniteContext.<UUID, 
> Integer>fromCache("sharedRDD");
> long start = System.currentTimeMillis();
> long totalLoaded = 0;
> while(System.currentTimeMillis() - start < 55_000) {
> // Define data to be stored in the Ignite RDD (cache).
> List<Integer> data = new ArrayList<>(20_000);
> for (int i = 0; i < 20_000; i++)
> data.add(i);
> // Preparing a Java RDD.
> JavaRDD<Integer> javaRDD = sparkContext.<Integer>parallelize(data);
> sharedRDD.saveValues(javaRDD);
> totalLoaded += 20_000;
> }
> System.out.println("Loaded " + totalLoaded);
> for (;;) {
> System.out.println(">>> Iterating over Ignite Shared RDD...");
> IgniteCache<Object, Object> cache = server.getOrCreateCache("sharedRDD");
> AtomicLong recordsLeft = new AtomicLong(0);
> for (Cache.Entry<Object, Object> entry : cache) {
> recordsLeft.incrementAndGet();
> }
> System.out.println("Left: " + recordsLeft.get());
> }
> // Close IgniteContext on all the workers.
> // igniteContext.close(true);
> }
> private static IgniteConfiguration createIgniteCfg() {
> IgniteConfiguration cfg = new IgniteConfiguration();
> cfg.setClientMode(true);
> DataStorageConfiguration memCfg = new DataStorageConfiguration()
> .setDefaultDataRegionConfiguration(
> new DataRegionConfiguration()
> .setCheckpointPageBufferSize(16 * 1024 * 1024)
> .setMaxSize(8 * 16 * 1024 * 1024)
> .setPersistenceEnabled(true));
> cfg.setDataStorageConfiguration(memCfg);
> TcpDiscoveryVmIpFinder finder = new TcpDiscoveryVmIpFinder(false);
> finder.setAddresses(Arrays.asList("localhost:47500..47600"));
> cfg.setDiscoverySpi( new TcpDiscoverySpi().setIpFinder(finder));
> CacheConfiguration<Object, Object> cacheCfg = new 
> CacheConfiguration<>("sharedRDD");
> cacheCfg.setAtomicityMode(ATOMIC);
> cacheCfg.setCacheMode(PARTITIONED);
> cacheCfg.setBackups(1);
> cacheCfg.setWriteSynchronizationMode(FULL_SYNC);
> cacheCfg.setEagerTtl(true);
> cacheCfg.setExpiryPolicyFactory(CreatedExpiryPolicy.factoryOf(Duration.ONE_MINUTE));
> cfg.setCacheConfiguration(cacheCfg);
> return cfg;
> }
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to