Hi Val,

let me explain it again. What i am looking is to built the cache for each
spark application and destroy it when spark app completes.
Something like you guys have with IgniteRDD in embedded mode. I can't use
IgniteRDD as we are getting into a nested RDD situation with our legacy
code.

I changed my code little bit now. Please have a look and let me know if
this looks ok.


1) val workers =
sparkContext.getConf.getInt("spark.executor.instances",sparkContext.getExecutorStorageStatus.length)

      // Start ignite server node on each worker in server mode.
    sparkContext.parallelize(1 to workers, workers).foreachPartition(it =>
ignite())

 def ignite(): Ignite ={
      val cfg = new IgniteConfiguration();
      val ignite:Ignite = Ignition.start(cfg)
      logInfo("Starting ignite node")
    ignite
    }

2)  val dataframe =
sqlContext.read.format("com.databricks.spark.avro").option("header",
"true").load(sparkConf.get("spark.data.avroFiles"))

val EntityRDD= dataframe.map (Desrialaize and save)

3) Load each partition to cache

  EntityRDD.mapPaatitions(x => {

     val cfg = new IgniteConfiguration();
     Ignition.setClientMode(true);
     val ignite:Ignite = Ignition.getOrStart(cfg)
      val orgCacheCfg:CacheConfiguration[String, EntityVO] = new
CacheConfiguration[String, EntityVO](MyCache)
      orgCacheCfg.setIndexedTypes(classOf[String], classOf[EntityVO])
      orgCacheCfg.setCacheMode(CacheMode.PARTITIONED)
      orgCacheCfg.setIndexedTypes()
      val cache:IgniteCache[String, EntityVO] =
ignite.getOrCreateCache(orgCacheCfg)
       while(x.hasNext){
        val entityvo = x.next()
        cache.put(entityvo.Id,entityvo)
      }
      x
    }).count()

4) Use the cache for look up :

 enitityNamesRDD.map(entityName => {

       val cfg = new IgniteConfiguration();

       Ignition.setClientMode(true);

       val ignite:Ignite = Ignition.getOrStart(cfg)
       val cacheConfig:CacheConfiguration[String, EntityVO] = new
CacheConfiguration[String, EntityVO](MyCache);
       cacheConfig.setIndexedTypes(classOf[String], classOf[EntityVO])
       val wcaIngiteCache:IgniteCache[String, EntityVO] =
ignite.getOrCreateCache(cacheConfig)
       val queryString = "canonicalName = ?"
       val companyNameQuery = new
SqlQuery[String,EntityVO]("EntityVO",queryString).setArgs(entityName)
       val results = wcaIngiteCache.query(companyNameQuery).getAll()
       val listIter = results.listIterator()
       val compResults = ListBuffer[EntityVO]()
       while(listIter.hasNext){
         val compObject = listIter.next()
         if(compObject.getValue.isInstanceOf[EntityVO])
           companyResults += compObject.getValue.asInstanceOf[EntityVO]
       }
         compResults.toVector

   }).collect().foreach(println)




Thanks,
Ranjit




On Tue, Feb 14, 2017 at 3:08 AM, vkulichenko <[email protected]>
wrote:

> Hi Ranjit,
>
> Not sure I understood. The main problem with executors is that they are
> controlled by Spark and they are created per application. So you can't
> share
> the data stored in embedded mode and it's not really safe to store it
> there.
> This can be useful only for some simple tests/demos, but not for real apps.
> Let me know if I'm missing something in your use case.
>
> -Val
>
>
>
> --
> View this message in context: http://apache-ignite-users.
> 70518.x6.nabble.com/Help-needed-tp10540p10607.html
> Sent from the Apache Ignite Users mailing list archive at Nabble.com.
>

Reply via email to