Hi Val,
let me explain it again. What i am looking is to built the cache for each
spark application and destroy it when spark app completes.
Something like you guys have with IgniteRDD in embedded mode. I can't use
IgniteRDD as we are getting into a nested RDD situation with our legacy
code.
I changed my code little bit now. Please have a look and let me know if
this looks ok.
1) val workers =
sparkContext.getConf.getInt("spark.executor.instances",sparkContext.getExecutorStorageStatus.length)
// Start ignite server node on each worker in server mode.
sparkContext.parallelize(1 to workers, workers).foreachPartition(it =>
ignite())
def ignite(): Ignite ={
val cfg = new IgniteConfiguration();
val ignite:Ignite = Ignition.start(cfg)
logInfo("Starting ignite node")
ignite
}
2) val dataframe =
sqlContext.read.format("com.databricks.spark.avro").option("header",
"true").load(sparkConf.get("spark.data.avroFiles"))
val EntityRDD= dataframe.map (Desrialaize and save)
3) Load each partition to cache
EntityRDD.mapPaatitions(x => {
val cfg = new IgniteConfiguration();
Ignition.setClientMode(true);
val ignite:Ignite = Ignition.getOrStart(cfg)
val orgCacheCfg:CacheConfiguration[String, EntityVO] = new
CacheConfiguration[String, EntityVO](MyCache)
orgCacheCfg.setIndexedTypes(classOf[String], classOf[EntityVO])
orgCacheCfg.setCacheMode(CacheMode.PARTITIONED)
orgCacheCfg.setIndexedTypes()
val cache:IgniteCache[String, EntityVO] =
ignite.getOrCreateCache(orgCacheCfg)
while(x.hasNext){
val entityvo = x.next()
cache.put(entityvo.Id,entityvo)
}
x
}).count()
4) Use the cache for look up :
enitityNamesRDD.map(entityName => {
val cfg = new IgniteConfiguration();
Ignition.setClientMode(true);
val ignite:Ignite = Ignition.getOrStart(cfg)
val cacheConfig:CacheConfiguration[String, EntityVO] = new
CacheConfiguration[String, EntityVO](MyCache);
cacheConfig.setIndexedTypes(classOf[String], classOf[EntityVO])
val wcaIngiteCache:IgniteCache[String, EntityVO] =
ignite.getOrCreateCache(cacheConfig)
val queryString = "canonicalName = ?"
val companyNameQuery = new
SqlQuery[String,EntityVO]("EntityVO",queryString).setArgs(entityName)
val results = wcaIngiteCache.query(companyNameQuery).getAll()
val listIter = results.listIterator()
val compResults = ListBuffer[EntityVO]()
while(listIter.hasNext){
val compObject = listIter.next()
if(compObject.getValue.isInstanceOf[EntityVO])
companyResults += compObject.getValue.asInstanceOf[EntityVO]
}
compResults.toVector
}).collect().foreach(println)
Thanks,
Ranjit
On Tue, Feb 14, 2017 at 3:08 AM, vkulichenko <[email protected]>
wrote:
> Hi Ranjit,
>
> Not sure I understood. The main problem with executors is that they are
> controlled by Spark and they are created per application. So you can't
> share
> the data stored in embedded mode and it's not really safe to store it
> there.
> This can be useful only for some simple tests/demos, but not for real apps.
> Let me know if I'm missing something in your use case.
>
> -Val
>
>
>
> --
> View this message in context: http://apache-ignite-users.
> 70518.x6.nabble.com/Help-needed-tp10540p10607.html
> Sent from the Apache Ignite Users mailing list archive at Nabble.com.
>