Re: Join : Giving incorrect result

2014-06-06 Thread Ajay Srivastava


Thanks Matei. We have tested the fix and it's working perfectly.

Andrew, we set spark.shuffle.spill=false but the application goes out of 
memory. I think that is expected.

Regards,Ajay 


On Friday, June 6, 2014 3:49 AM, Andrew Ash  wrote:
 


Hi Ajay,

Can you please try running the same code with spark.shuffle.spill=false and see 
if the numbers turn out correctly?  That parameter controls whether or not the 
buggy code that Matei fixed in ExternalAppendOnlyMap is used.

FWIW I saw similar issues in 0.9.0 but no longer in 0.9.1 after I think some 
fixes in spilling landed.

Andrew



On Thu, Jun 5, 2014 at 3:05 PM, Matei Zaharia  wrote:

Hey Ajay, thanks for reporting this. There was indeed a bug, specifically in 
the way join tasks spill to disk (which happened when you had more concurrent 
tasks competing for memory). I’ve posted a patch for it here: 
https://github.com/apache/spark/pull/986. Feel free to try that if you’d like; 
it will also be in 0.9.2 and 1.0.1.
>
>
>Matei
>
>
>On Jun 5, 2014, at 12:19 AM, Ajay Srivastava  wrote:
>
>Sorry for replying late. It was night here.
>>
>>
>>Lian/Matei,
>>Here is the code snippet -
>>    sparkConf.set("spark.executor.memory", "10g")
>>    sparkConf.set("spark.cores.max", "5")
>>    
>>    val sc = new SparkContext(sparkConf)
>>    
>>    val accId2LocRDD = 
>>sc.textFile("hdfs://bbr-dev178:9000/data/subDbSpark/account2location").map(getKeyValueFromString(_,
>> 0, ',', true))
>>  
>>    val accId2DemoRDD = 
>>sc.textFile("hdfs://bbr-dev178:9000/data/subDbSpark/account2demographic_planType").map(getKeyValueFromString(_,
>> 0, ',', true))
>>    
>>    val joinedRDD = accId2LocRDD.join(accId2DemoRDD)
>>
>>
>>  def getKeyValueFromString(line: String, keyIndex: Int, delimit: Char, 
>>retFullLine: Boolean): Tuple2[String, String] = {
>>    val splits = line.split(delimit)
>>    if (splits.length <= 1) {
>>  (null, null)
>>    } else if (retFullLine) {
>>  (splits(keyIndex), line)
>>    } else{
>>        (splits(keyIndex), splits(splits.length-keyIndex-1))
>>    }
>>  }
>>
>>    
>>
>>Both of these files have 10 M records with same unique keys. Size of the file 
>>is nearly 280 MB and block size in hdfs is 256 MB. The output of join should 
>>contain 10 M records.
>>
>>
>>
>>We have done some more experiments -
>>1) Running cogroup instead of join - it also gives incorrect count.
>>2) Running union followed by groupbykey and then filtering records with two 
>>entries in sequence - It also gives incorrect count.
>>3) Increase spark.executor.memory to 50 g and everything works fine. Count 
>>comes 10 M for join,cogroup and union/groupbykey/filter transformations.
>>
>>
>>
>>I thought that 10g is enough memory for executors but even if the memory is 
>>less it should not result in incorrect computation. Probably there is a 
>>problem in reconstructing RDDs when memory is not enough. 
>>
>>
>>
>>Thanks Chen for your observation. I get this problem on single worker so 
>>there will not be any mismatch of jars. On two workers, since executor memory 
>>gets doubled the code works fine.
>>
>>
>>
>>Regards,
>>Ajay
>>
>>
>>
>>
>>On Thursday, June 5, 2014 1:35 AM, Matei Zaharia  
>>wrote:
>> 
>>
>>
>>If this isn’t the problem, it would be great if you can post the code for the 
>>program.
>>
>>
>>Matei
>>
>>
>>
>>On Jun 4, 2014, at 12:58 PM, Xu (Simon) Chen  wrote:
>>
>>Maybe your two workers have different assembly jar files?
>>>I just ran into a similar problem that my spark-shell is using a different 
>>>jar file than my workers - got really confusing results.
>>>On Jun 4, 2014 8:33 AM, "Ajay Srivastava"  wrote:
>>>
>>>Hi,


I am doing join of two RDDs which giving different results ( counting 
number of records ) each time I run this code on same input.


The input files are large enough to be divided in two splits. When the 
program runs on two workers with single core assigned to these, output is 
consistent and looks correct. But when single worker is used with two or 
more than two cores, the result seems to be random. Every time, count of 
joined record is different.


Does this sound like a defect or I need to take care of something while 
using join ? I am using spark-0.9.1.



Regards
Ajay
>>
>>
>>
>

Re: Join : Giving incorrect result

2014-06-05 Thread Andrew Ash
Hi Ajay,

Can you please try running the same code with spark.shuffle.spill=false and
see if the numbers turn out correctly?  That parameter controls whether or
not the buggy code that Matei fixed in ExternalAppendOnlyMap is used.

FWIW I saw similar issues in 0.9.0 but no longer in 0.9.1 after I think
some fixes in spilling landed.

Andrew


On Thu, Jun 5, 2014 at 3:05 PM, Matei Zaharia 
wrote:

> Hey Ajay, thanks for reporting this. There was indeed a bug, specifically
> in the way join tasks spill to disk (which happened when you had more
> concurrent tasks competing for memory). I’ve posted a patch for it here:
> https://github.com/apache/spark/pull/986. Feel free to try that if you’d
> like; it will also be in 0.9.2 and 1.0.1.
>
> Matei
>
> On Jun 5, 2014, at 12:19 AM, Ajay Srivastava 
> wrote:
>
> Sorry for replying late. It was night here.
>
> Lian/Matei,
> Here is the code snippet -
> sparkConf.set("spark.executor.memory", "10g")
> sparkConf.set("spark.cores.max", "5")
>
> val sc = new SparkContext(sparkConf)
>
> val accId2LocRDD = sc.textFile("
> hdfs://bbr-dev178:9000/data/subDbSpark/account2location").map(getKeyValueFromString(_,
> 0, ',', true))
>
> val accId2DemoRDD = sc.textFile("
> hdfs://bbr-dev178:9000/data/subDbSpark/account2demographic_planType").map(getKeyValueFromString(_,
> 0, ',', true))
>
> val joinedRDD = accId2LocRDD.join(accId2DemoRDD)
>
>   def getKeyValueFromString(line: String, keyIndex: Int, delimit: Char,
> retFullLine: Boolean): Tuple2[String, String] = {
> val splits = line.split(delimit)
> if (splits.length <= 1) {
>   (null, null)
> } else if (retFullLine) {
>   (splits(keyIndex), line)
> } else{
> (splits(keyIndex), splits(splits.length-keyIndex-1))
> }
>   }
>
> Both of these files have 10 M records with same unique keys. Size of the
> file is nearly 280 MB and block size in hdfs is 256 MB. The output of join
> should contain 10 M records.
>
> We have done some more experiments -
> 1) Running cogroup instead of join - it also gives incorrect count.
> 2) Running union followed by groupbykey and then filtering records with
> two entries in sequence - It also gives incorrect count.
> 3) Increase spark.executor.memory to 50 g and everything works fine. Count
> comes 10 M for join,cogroup and union/groupbykey/filter transformations.
>
> I thought that 10g is enough memory for executors but even if the memory
> is less it should not result in incorrect computation. Probably there is a
> problem in reconstructing RDDs when memory is not enough.
>
> Thanks Chen for your observation. I get this problem on single worker so
> there will not be any mismatch of jars. On two workers, since executor
> memory gets doubled the code works fine.
>
> Regards,
> Ajay
>
>
>   On Thursday, June 5, 2014 1:35 AM, Matei Zaharia <
> matei.zaha...@gmail.com> wrote:
>
>
>  If this isn’t the problem, it would be great if you can post the code
> for the program.
>
> Matei
>
> On Jun 4, 2014, at 12:58 PM, Xu (Simon) Chen  wrote:
>
> Maybe your two workers have different assembly jar files?
> I just ran into a similar problem that my spark-shell is using a different
> jar file than my workers - got really confusing results.
> On Jun 4, 2014 8:33 AM, "Ajay Srivastava" 
> wrote:
>
> Hi,
>
> I am doing join of two RDDs which giving different results ( counting
> number of records ) each time I run this code on same input.
>
> The input files are large enough to be divided in two splits. When the
> program runs on two workers with single core assigned to these, output is
> consistent and looks correct. But when single worker is used with two or
> more than two cores, the result seems to be random. Every time, count of
> joined record is different.
>
> Does this sound like a defect or I need to take care of something while
> using join ? I am using spark-0.9.1.
>
> Regards
> Ajay
>
>
>
>
>
>


Re: Join : Giving incorrect result

2014-06-05 Thread Matei Zaharia
Hey Ajay, thanks for reporting this. There was indeed a bug, specifically in 
the way join tasks spill to disk (which happened when you had more concurrent 
tasks competing for memory). I’ve posted a patch for it here: 
https://github.com/apache/spark/pull/986. Feel free to try that if you’d like; 
it will also be in 0.9.2 and 1.0.1.

Matei

On Jun 5, 2014, at 12:19 AM, Ajay Srivastava  wrote:

> Sorry for replying late. It was night here.
> 
> Lian/Matei,
> Here is the code snippet -
> sparkConf.set("spark.executor.memory", "10g")
> sparkConf.set("spark.cores.max", "5")
> 
> val sc = new SparkContext(sparkConf)
> 
> val accId2LocRDD = 
> sc.textFile("hdfs://bbr-dev178:9000/data/subDbSpark/account2location").map(getKeyValueFromString(_,
>  0, ',', true))
>   
> val accId2DemoRDD = 
> sc.textFile("hdfs://bbr-dev178:9000/data/subDbSpark/account2demographic_planType").map(getKeyValueFromString(_,
>  0, ',', true))
> 
> val joinedRDD = accId2LocRDD.join(accId2DemoRDD)
> 
>   def getKeyValueFromString(line: String, keyIndex: Int, delimit: Char, 
> retFullLine: Boolean): Tuple2[String, String] = {
> val splits = line.split(delimit)
> if (splits.length <= 1) {
>   (null, null)
> } else if (retFullLine) {
>   (splits(keyIndex), line)
> } else{
> (splits(keyIndex), splits(splits.length-keyIndex-1))
> }
>   }
> 
> Both of these files have 10 M records with same unique keys. Size of the file 
> is nearly 280 MB and block size in hdfs is 256 MB. The output of join should 
> contain 10 M records.
> 
> We have done some more experiments -
> 1) Running cogroup instead of join - it also gives incorrect count.
> 2) Running union followed by groupbykey and then filtering records with two 
> entries in sequence - It also gives incorrect count.
> 3) Increase spark.executor.memory to 50 g and everything works fine. Count 
> comes 10 M for join,cogroup and union/groupbykey/filter transformations.
> 
> I thought that 10g is enough memory for executors but even if the memory is 
> less it should not result in incorrect computation. Probably there is a 
> problem in reconstructing RDDs when memory is not enough. 
> 
> Thanks Chen for your observation. I get this problem on single worker so 
> there will not be any mismatch of jars. On two workers, since executor memory 
> gets doubled the code works fine.
> 
> Regards,
> Ajay
> 
> 
> On Thursday, June 5, 2014 1:35 AM, Matei Zaharia  
> wrote:
> 
> 
> If this isn’t the problem, it would be great if you can post the code for the 
> program.
> 
> Matei
> 
> On Jun 4, 2014, at 12:58 PM, Xu (Simon) Chen  wrote:
> 
>> Maybe your two workers have different assembly jar files?
>> I just ran into a similar problem that my spark-shell is using a different 
>> jar file than my workers - got really confusing results.
>> On Jun 4, 2014 8:33 AM, "Ajay Srivastava"  wrote:
>> Hi,
>> 
>> I am doing join of two RDDs which giving different results ( counting number 
>> of records ) each time I run this code on same input.
>> 
>> The input files are large enough to be divided in two splits. When the 
>> program runs on two workers with single core assigned to these, output is 
>> consistent and looks correct. But when single worker is used with two or 
>> more than two cores, the result seems to be random. Every time, count of 
>> joined record is different.
>> 
>> Does this sound like a defect or I need to take care of something while 
>> using join ? I am using spark-0.9.1.
>> 
>> Regards
>> Ajay
> 
> 
> 



Re: Join : Giving incorrect result

2014-06-05 Thread Ajay Srivastava
Sorry for replying late. It was night here.

Lian/Matei,
Here is the code snippet -
    sparkConf.set("spark.executor.memory", "10g")
    sparkConf.set("spark.cores.max", "5")
    
    val sc = new SparkContext(sparkConf)
    
    val accId2LocRDD = 
sc.textFile("hdfs://bbr-dev178:9000/data/subDbSpark/account2location").map(getKeyValueFromString(_,
 0, ',', true))
  
    val accId2DemoRDD = 
sc.textFile("hdfs://bbr-dev178:9000/data/subDbSpark/account2demographic_planType").map(getKeyValueFromString(_,
 0, ',', true))
    
    val joinedRDD = accId2LocRDD.join(accId2DemoRDD)

  def getKeyValueFromString(line: String, keyIndex: Int, delimit: Char, 
retFullLine: Boolean): Tuple2[String, String] = {
    val splits = line.split(delimit)
    if (splits.length <= 1) {
  (null, null)
    } else if (retFullLine) {
  (splits(keyIndex), line)
    } else{
        (splits(keyIndex), splits(splits.length-keyIndex-1))
    }
  }

    

Both of these files have 10 M records with same unique keys. Size of the file 
is nearly 280 MB and block size in hdfs is 256 MB. The output of join should 
contain 10 M records.


We have done some more experiments -
1) Running cogroup instead of join - it also gives incorrect count.
2) Running union followed by groupbykey and then filtering records with two 
entries in sequence - It also gives incorrect count.
3) Increase spark.executor.memory to 50 g and everything works fine. Count 
comes 10 M for join,cogroup and union/groupbykey/filter transformations.


I thought that 10g is enough memory for executors but even if the memory is 
less it should not result in incorrect computation. Probably there is a problem 
in reconstructing RDDs when memory is not enough. 


Thanks Chen for your observation. I get this problem on single worker so there 
will not be any mismatch of jars. On two workers, since executor memory gets 
doubled the code works fine.


Regards,
Ajay



On Thursday, June 5, 2014 1:35 AM, Matei Zaharia  
wrote:
 


If this isn’t the problem, it would be great if you can post the code for the 
program.

Matei



On Jun 4, 2014, at 12:58 PM, Xu (Simon) Chen  wrote:

Maybe your two workers have different assembly jar files?
>I just ran into a similar problem that my spark-shell is using a different jar 
>file than my workers - got really confusing results.
>On Jun 4, 2014 8:33 AM, "Ajay Srivastava"  wrote:
>
>Hi,
>>
>>
>>I am doing join of two RDDs which giving different results ( counting number 
>>of records ) each time I run this code on same input.
>>
>>
>>The input files are large enough to be divided in two splits. When the 
>>program runs on two workers with single core assigned to these, output is 
>>consistent and looks correct. But when single worker is used with two or more 
>>than two cores, the result seems to be random. Every time, count of joined 
>>record is different.
>>
>>
>>Does this sound like a defect or I need to take care of something while using 
>>join ? I am using spark-0.9.1.
>>
>>
>>
>>Regards
>>Ajay

Re: Join : Giving incorrect result

2014-06-04 Thread Matei Zaharia
If this isn’t the problem, it would be great if you can post the code for the 
program.

Matei

On Jun 4, 2014, at 12:58 PM, Xu (Simon) Chen  wrote:

> Maybe your two workers have different assembly jar files?
> 
> I just ran into a similar problem that my spark-shell is using a different 
> jar file than my workers - got really confusing results.
> 
> On Jun 4, 2014 8:33 AM, "Ajay Srivastava"  wrote:
> Hi,
> 
> I am doing join of two RDDs which giving different results ( counting number 
> of records ) each time I run this code on same input.
> 
> The input files are large enough to be divided in two splits. When the 
> program runs on two workers with single core assigned to these, output is 
> consistent and looks correct. But when single worker is used with two or more 
> than two cores, the result seems to be random. Every time, count of joined 
> record is different.
> 
> Does this sound like a defect or I need to take care of something while using 
> join ? I am using spark-0.9.1.
> 
> Regards
> Ajay



Re: Join : Giving incorrect result

2014-06-04 Thread Xu (Simon) Chen
Maybe your two workers have different assembly jar files?

I just ran into a similar problem that my spark-shell is using a different
jar file than my workers - got really confusing results.
On Jun 4, 2014 8:33 AM, "Ajay Srivastava"  wrote:

> Hi,
>
> I am doing join of two RDDs which giving different results ( counting
> number of records ) each time I run this code on same input.
>
> The input files are large enough to be divided in two splits. When the
> program runs on two workers with single core assigned to these, output is
> consistent and looks correct. But when single worker is used with two or
> more than two cores, the result seems to be random. Every time, count of
> joined record is different.
>
> Does this sound like a defect or I need to take care of something while
> using join ? I am using spark-0.9.1.
>
> Regards
> Ajay
>


Re: Join : Giving incorrect result

2014-06-04 Thread Cheng Lian
Hi Ajay, would you mind to synthesise a minimum code snippet that can
reproduce this issue and paste it here?


On Wed, Jun 4, 2014 at 8:32 PM, Ajay Srivastava 
wrote:

> Hi,
>
> I am doing join of two RDDs which giving different results ( counting
> number of records ) each time I run this code on same input.
>
> The input files are large enough to be divided in two splits. When the
> program runs on two workers with single core assigned to these, output is
> consistent and looks correct. But when single worker is used with two or
> more than two cores, the result seems to be random. Every time, count of
> joined record is different.
>
> Does this sound like a defect or I need to take care of something while
> using join ? I am using spark-0.9.1.
>
> Regards
> Ajay
>


Join : Giving incorrect result

2014-06-04 Thread Ajay Srivastava
Hi,

I am doing join of two RDDs which giving different results ( counting number of 
records ) each time I run this code on same input.

The input files are large enough to be divided in two splits. When the program 
runs on two workers with single core assigned to these, output is consistent 
and looks correct. But when single worker is used with two or more than two 
cores, the result seems to be random. Every time, count of joined record is 
different.

Does this sound like a defect or I need to take care of something while using 
join ? I am using spark-0.9.1.


Regards
Ajay