Thanks vkulichenko
I will describe my issues detailed as below. I have the same code on two
scenarios but firestone is correct and second one is not correct.
I'm studying ignite recent days but can't get correct result on
thisHopefully anyone can help me on this.
==
1、Run ignite with spark-shell
1)./spark-shell --jars
/u01/xxx/apache-ignite-hadoop-1.6.0-bin/libs/ignite-core-1.6.0.jar,/u01/xxx/apache-ignite-hadoop-1.6.0-bin/libs/ignite-spark/ignite-spark-1.6.0.jar,/u01/xxx/apache-ignite-hadoop-1.6.0-bin/libs/cache-api-1.0.0.jar,/u01/xxx/apache-ignite-hadoop-1.6.0-bin/libs/ignite-log4j/ignite-log4j-1.6.0.jar,/u01/xxx/apache-ignite-hadoop-1.6.0-bin/libs/ignite-log4j/log4j-1.2.17.jar
--packages
org.apache.ignite:ignite-spark:1.6.0,org.apache.ignite:ignite-spring:1.6.0
2)running the following code on spark-shell
val ic = new IgniteContext[Int, Int](sc, () => new
IgniteConfiguration(),false)
val sharedRDD = ic.fromCache("sharedBaselineCacheRDD")
val initalRDD = sc.parallelize(1 to 10,10).map(i => (i, i))
println("initalRDD.counter=/. " + initalRDD.count() +"\t
partitionCounter=> " + initalRDD.partitions.size)
//sharedRDD.saveValues(initalRDD.map(line=>line._1))
sharedRDD.savePairs(initalRDD, true)//override cache on ignite
println("=>totalIgniteEmbedCounter" + sharedRDD.count + "\t
igniteParitionCounter => " + sharedRDD.partitions.size)
println("=>totalIgniteFilterConditionEmbedCounter" +
sharedRDD.filter(_._2 > 5).count)
3)result as below
scala> import org.apache.ignite.spark._
import org.apache.ignite.spark._
scala> import org.apache.ignite.configuration._
import org.apache.ignite.configuration._
scala> val ic = new IgniteContext[Int, Int](sc, () => new
IgniteConfiguration(),false)
ic: org.apache.ignite.spark.IgniteContext[Int,Int] =
org.apache.ignite.spark.IgniteContext@74e72ff4
scala> val sharedRDD = ic.fromCache("sharedBaselineCacheRDD")
sharedRDD: org.apache.ignite.spark.IgniteRDD[Int,Int] = IgniteRDD[1] at RDD
at IgniteAbstractRDD.scala:31
scala> val initalRDD = sc.parallelize(1 to 10,10).map(i => (i, i))
initalRDD: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[3] at map
at :33
scala> println("initalRDD.counter=/. " + initalRDD.count() +"\t
partitionCounter=> " + initalRDD.partitions.size)
initalRDD.counter=/. 10 partitionCounter=> 10
scala> sharedRDD.savePairs(initalRDD, true)//override cache on ignite
scala> println("=>totalIgniteEmbedCounter" + sharedRDD.count + "\t
igniteParitionCounter => " + sharedRDD.partitions.size)
*=>totalIgniteEmbedCounter10 igniteParitionCounter => 1024
*
scala> println("=>totalIgniteFilterConditionEmbedCounter" +
sharedRDD.filter(_._2 > 5).count)
*=>totalIgniteFilterConditionEmbedCounter5*
totalIgniteEmbedCounter is :10 ,right
totalIgniteFilterConditionEmbedCounteris :5, right
==
2、IDEA project
1)create a maven project on idea
2) import ignite maven files as above [1]
3)code as below for idea
object TestIgniteEmbedCache {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("TestIgniteEmbedCache")
val sc = new SparkContext(conf)
//val ic = new IgniteContext[Int, Int](sc, () => new
IgniteConfiguration().setIncludeEventTypes(EventType.EVT_TASK_FAILED),false)
val ic = new IgniteContext[Int, Int](sc, () => new
IgniteConfiguration(),false)
val sharedRDD = ic.fromCache("sharedBaselineCacheRDD")
val initalRDD = sc.parallelize(1 to 10,10).map(i => (i, i))
println("initalRDD.counter=/. " + initalRDD.count() +"\t
partitionCounter=> " + initalRDD.partitions.size)
//sharedRDD.saveValues(initalRDD.map(line=>line._1))
sharedRDD.savePairs(initalRDD, true)//override cache on ignite
println("=>totalIgniteEmbedCounter" + sharedRDD.count + "\t
igniteParitionCounter => " + sharedRDD.partitions.size)
println("=>totalIgniteFilterConditionEmbedCounter" +
sharedRDD.filter(_._2 > 5).count)
}
}
4、running maven clean assembly:assembly and get sparkignitedemo.jar
5、upload this jar to our linux driver machine and submit jar to yarn cluster
using spark-submit command as below
/u01/spark-1.6.0-hive/bin/spark-submit --driver-memory 8G --class
com.TestIgniteEmbedCache *--master yarn *--executor-cores 5
--executor-memory 1000m --num-executors 10 --conf spark.rdd.compress=false
--conf spark.shuffle.compress=false --conf spark.broadcast.compress=false
/home/sparkignitedemo.jar
6、result: this is issue on this
totalIgniteEmbedCounter is : 4 or 3000(I think is random)
totalIgniteFilterConditionEmbedCounteris :1 or 2000(random)
==
This result is very make me confused on this why the same code have two
different result on this? Can anyone help me on this? I'm blocking this
issue on sereval days. If