Sorry for replying late. It was night here.

Lian/Matei,
Here is the code snippet -
    sparkConf.set("spark.executor.memory", "10g")
    sparkConf.set("spark.cores.max", "5")
    
    val sc = new SparkContext(sparkConf)
    
    val accId2LocRDD = 
sc.textFile("hdfs://bbr-dev178:9000/data/subDbSpark/account2location").map(getKeyValueFromString(_,
 0, ',', true))
      
    val accId2DemoRDD = 
sc.textFile("hdfs://bbr-dev178:9000/data/subDbSpark/account2demographic_planType").map(getKeyValueFromString(_,
 0, ',', true))
    
    val joinedRDD = accId2LocRDD.join(accId2DemoRDD)

  def getKeyValueFromString(line: String, keyIndex: Int, delimit: Char, 
retFullLine: Boolean): Tuple2[String, String] = {
    val splits = line.split(delimit)
    if (splits.length <= 1) {
      (null, null)
    } else if (retFullLine) {
      (splits(keyIndex), line)
    } else{
        (splits(keyIndex), splits(splits.length-keyIndex-1))
    }
  }

    

Both of these files have 10 M records with same unique keys. Size of the file 
is nearly 280 MB and block size in hdfs is 256 MB. The output of join should 
contain 10 M records.


We have done some more experiments -
1) Running cogroup instead of join - it also gives incorrect count.
2) Running union followed by groupbykey and then filtering records with two 
entries in sequence - It also gives incorrect count.
3) Increase spark.executor.memory to 50 g and everything works fine. Count 
comes 10 M for join,cogroup and union/groupbykey/filter transformations.


I thought that 10g is enough memory for executors but even if the memory is 
less it should not result in incorrect computation. Probably there is a problem 
in reconstructing RDDs when memory is not enough. 


Thanks Chen for your observation. I get this problem on single worker so there 
will not be any mismatch of jars. On two workers, since executor memory gets 
doubled the code works fine.


Regards,
Ajay



On Thursday, June 5, 2014 1:35 AM, Matei Zaharia <matei.zaha...@gmail.com> 
wrote:
 


If this isn’t the problem, it would be great if you can post the code for the 
program.

Matei



On Jun 4, 2014, at 12:58 PM, Xu (Simon) Chen <xche...@gmail.com> wrote:

Maybe your two workers have different assembly jar files?
>I just ran into a similar problem that my spark-shell is using a different jar 
>file than my workers - got really confusing results.
>On Jun 4, 2014 8:33 AM, "Ajay Srivastava" <a_k_srivast...@yahoo.com> wrote:
>
>Hi,
>>
>>
>>I am doing join of two RDDs which giving different results ( counting number 
>>of records ) each time I run this code on same input.
>>
>>
>>The input files are large enough to be divided in two splits. When the 
>>program runs on two workers with single core assigned to these, output is 
>>consistent and looks correct. But when single worker is used with two or more 
>>than two cores, the result seems to be random. Every time, count of joined 
>>record is different.
>>
>>
>>Does this sound like a defect or I need to take care of something while using 
>>join ? I am using spark-0.9.1.
>>
>>
>>
>>Regards
>>Ajay

Reply via email to