one more point to add is in my case the query will start from executor node
and not driver node.

On Tue, Feb 14, 2017 at 2:42 PM, Ranjit Sahu <[email protected]> wrote:

> Hi Val,
>
> let me explain it again. What i am looking is to built the cache for each
> spark application and destroy it when spark app completes.
> Something like you guys have with IgniteRDD in embedded mode. I can't use
> IgniteRDD as we are getting into a nested RDD situation with our legacy
> code.
>
> I changed my code little bit now. Please have a look and let me know if
> this looks ok.
>
>
> 1) val workers = sparkContext.getConf.getInt("spark.executor.instances",
> sparkContext.getExecutorStorageStatus.length)
>
>       // Start ignite server node on each worker in server mode.
>     sparkContext.parallelize(1 to workers, workers).foreachPartition(it =>
> ignite())
>
>  def ignite(): Ignite ={
>       val cfg = new IgniteConfiguration();
>       val ignite:Ignite = Ignition.start(cfg)
>       logInfo("Starting ignite node")
>     ignite
>     }
>
> 2)  val dataframe = 
> sqlContext.read.format("com.databricks.spark.avro").option("header",
> "true").load(sparkConf.get("spark.data.avroFiles"))
>
> val EntityRDD= dataframe.map (Desrialaize and save)
>
> 3) Load each partition to cache
>
>   EntityRDD.mapPaatitions(x => {
>
>      val cfg = new IgniteConfiguration();
>      Ignition.setClientMode(true);
>      val ignite:Ignite = Ignition.getOrStart(cfg)
>       val orgCacheCfg:CacheConfiguration[String, EntityVO] = new
> CacheConfiguration[String, EntityVO](MyCache)
>       orgCacheCfg.setIndexedTypes(classOf[String], classOf[EntityVO])
>       orgCacheCfg.setCacheMode(CacheMode.PARTITIONED)
>       orgCacheCfg.setIndexedTypes()
>       val cache:IgniteCache[String, EntityVO] = ignite.getOrCreateCache(
> orgCacheCfg)
>        while(x.hasNext){
>         val entityvo = x.next()
>         cache.put(entityvo.Id,entityvo)
>       }
>       x
>     }).count()
>
> 4) Use the cache for look up :
>
>  enitityNamesRDD.map(entityName => {
>
>        val cfg = new IgniteConfiguration();
>
>        Ignition.setClientMode(true);
>
>        val ignite:Ignite = Ignition.getOrStart(cfg)
>        val cacheConfig:CacheConfiguration[String, EntityVO] = new
> CacheConfiguration[String, EntityVO](MyCache);
>        cacheConfig.setIndexedTypes(classOf[String], classOf[EntityVO])
>        val wcaIngiteCache:IgniteCache[String, EntityVO] =
> ignite.getOrCreateCache(cacheConfig)
>        val queryString = "canonicalName = ?"
>        val companyNameQuery = new SqlQuery[String,EntityVO]("
> EntityVO",queryString).setArgs(entityName)
>        val results = wcaIngiteCache.query(companyNameQuery).getAll()
>        val listIter = results.listIterator()
>        val compResults = ListBuffer[EntityVO]()
>        while(listIter.hasNext){
>          val compObject = listIter.next()
>          if(compObject.getValue.isInstanceOf[EntityVO])
>            companyResults += compObject.getValue.asInstanceOf[EntityVO]
>        }
>          compResults.toVector
>
>    }).collect().foreach(println)
>
>
>
>
> Thanks,
> Ranjit
>
>
>
>
> On Tue, Feb 14, 2017 at 3:08 AM, vkulichenko <
> [email protected]> wrote:
>
>> Hi Ranjit,
>>
>> Not sure I understood. The main problem with executors is that they are
>> controlled by Spark and they are created per application. So you can't
>> share
>> the data stored in embedded mode and it's not really safe to store it
>> there.
>> This can be useful only for some simple tests/demos, but not for real
>> apps.
>> Let me know if I'm missing something in your use case.
>>
>> -Val
>>
>>
>>
>> --
>> View this message in context: http://apache-ignite-users.705
>> 18.x6.nabble.com/Help-needed-tp10540p10607.html
>> Sent from the Apache Ignite Users mailing list archive at Nabble.com.
>>
>
>

Reply via email to