SPARK-8813 - combining small files in spark sql

2016-07-06 Thread Ajay Srivastava
Hi,
This jira https://issues.apache.org/jira/browse/SPARK-8813 is fixed in spark 
2.0.But resolution is not mentioned there.
In our use case, there are big as well as many small parquet files which are 
being queried using spark sql.Can someone please explain what is the fix and 
how I can use it in spark 2.0 ? I did search commits done in 2.0 branch and 
looks like I need to use spark.sql.files.openCostInBytes but I am not sure.


Regards,Ajay


Re: Data locality across jobs

2015-04-03 Thread Ajay Srivastava
You can read same partition from every hour's output, union these RDDs and then 
repartition them as a single partition. This will be done for all partitions 
one by one. It may not necessarily improve the performance, will depend on size 
of spills in job when all the data was processed together.
Regards,Ajay
 


 On Friday, April 3, 2015 2:01 AM, Sandy Ryza  
wrote:
   

 This isn't currently a capability that Spark has, though it has definitely 
been discussed: https://issues.apache.org/jira/browse/SPARK-1061.  The primary 
obstacle at this point is that Hadoop's FileInputFormat doesn't guarantee that 
each file corresponds to a single split, so the records corresponding to a 
particular partition at the end of the first job can end up split across 
multiple partitions in the second job.
-Sandy
On Wed, Apr 1, 2015 at 9:09 PM, kjsingh  wrote:

Hi,

We are running an hourly job using Spark 1.2 on Yarn. It saves an RDD of
Tuple2. At the end of day, a daily job is launched, which works on the
outputs of the hourly jobs.

For data locality and speed, we wish that when the daily job launches, it
finds all instances of a given key at a single executor rather than fetching
it from others during shuffle.

Is it possible to maintain key partitioning across jobs? We can control
partitioning in one job. But how do we send keys to the executors of same
node manager across jobs? And while saving data to HDFS, are the blocks
allocated to the same data node machine as the executor for a partition?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Data-locality-across-jobs-tp22351.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org





  

Re: Some tasks are taking long time

2015-01-15 Thread Ajay Srivastava
Thanks Nicos.GC does not contribute much to the execution time of the task. I 
will debug it further today.
Regards,Ajay
 

 On Thursday, January 15, 2015 11:55 PM, Nicos  wrote:
   

 Ajay, Unless we are dealing with some synchronization/conditional variable bug 
in Spark, try this per tuning guide:Cache Size TuningOne important 
configuration parameter for GC is the amount of memory that should be used for 
caching RDDs. By default, Spark uses 60% of the configured executor memory 
(spark.executor.memory) to cache RDDs. This means that 40% of memory is 
available for any objects created during task execution.In case your tasks slow 
down and you find that your JVM is garbage-collecting frequently or running out 
of memory, lowering this value will help reduce the memory consumption. To 
change this to, say, 50%, you can call conf.set("spark.storage.memoryFraction", 
"0.5") on your SparkConf. Combined with the use of serialized caching, using a 
smaller cache should be sufficient to mitigate most of the garbage collection 
problems. In case you are interested in further tuning the Java GC, continue 
reading below.
Complete list of tips 
here:https://spark.apache.org/docs/latest/tuning.html#serialized-rdd-storage
Cheers,- Nicos

On Jan 15, 2015, at 6:49 AM, Ajay Srivastava  
wrote:
Thanks RK. I can turn on speculative execution but I am trying to find out 
actual reason for delay as it happens on any node. Any idea about the stack 
trace in my previous mail.
Regards,Ajay
 

 On Thursday, January 15, 2015 8:02 PM, RK  wrote:
   

 If you don't want a few slow tasks to slow down the entire job, you can turn 
on speculation. 
Here are the speculation settings from Spark Configuration - Spark 1.2.0 
Documentation.
|   |
|   |   |   |   |   |
| Spark Configuration - Spark 1.2.0 DocumentationSpark Configuration Spark 
Properties Dynamically Loading Spark Properties Viewing Spark Properties 
Available Properties Application Properties Runtime Environment Shuffle 
Behavior Spark UI  |
|  |
| View on spark.apache.org | Preview by Yahoo |
|  |
|   |



| spark.speculation | false | If set to "true", performs speculative execution 
of tasks. This means if one or more tasks are running slowly in a stage, they 
will be re-launched. |
| spark.speculation.interval | 100 | How often Spark will check for tasks to 
speculate, in milliseconds. |
| spark.speculation.quantile | 0.75 | Percentage of tasks which must be 
complete before speculation is enabled for a particular stage. |
| spark.speculation.multiplier | 1.5 | How many times slower a task is than the 
median to be considered for speculation.
 |

  

 On Thursday, January 15, 2015 5:44 AM, Ajay Srivastava 
 wrote:
   

 Hi,
My spark job is taking long time. I see that some tasks are taking longer time 
for same amount of data and shuffle read/write. What could be the possible 
reasons for it ?
The thread-dump sometimes show that all the tasks in an executor are waiting 
with following stack trace -
"Executor task launch worker-12" daemon prio=10 tid=0x7fcd44276000 
nid=0x3f85 waiting on condition [0x7fcce3ddc000]
   java.lang.Thread.State: WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for  <0x7fd0aee82e00> (a 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
    at java.util.concurrent.locks.LockSupport.park(Unknown Source)
    at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(Unknown
 Source)
    at java.util.concurrent.LinkedBlockingQueue.take(Unknown Source)
    at 
org.apache.spark.storage.BlockFetcherIterator$BasicBlockFetcherIterator.next(BlockFetcherIterator.scala:253)
    at 
org.apache.spark.storage.BlockFetcherIterator$BasicBlockFetcherIterator.next(BlockFetcherIterator.scala:77)
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
    at 
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30)
    at 
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at 
org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:137)
    at 
org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159)
    at 
org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:158)
    at 
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
    at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
    at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:158)
    at org.apache.spark.rd

Re: Some tasks are taking long time

2015-01-15 Thread Ajay Srivastava
Thanks RK. I can turn on speculative execution but I am trying to find out 
actual reason for delay as it happens on any node. Any idea about the stack 
trace in my previous mail.
Regards,Ajay
 

 On Thursday, January 15, 2015 8:02 PM, RK  wrote:
   

 If you don't want a few slow tasks to slow down the entire job, you can turn 
on speculation. 
Here are the speculation settings from Spark Configuration - Spark 1.2.0 
Documentation.
|   |
|   |   |   |   |   |
| Spark Configuration - Spark 1.2.0 DocumentationSpark Configuration Spark 
Properties Dynamically Loading Spark Properties Viewing Spark Properties 
Available Properties Application Properties Runtime Environment Shuffle 
Behavior Spark UI  |
|  |
| View on spark.apache.org | Preview by Yahoo |
|  |
|   |



| spark.speculation | false | If set to "true", performs speculative execution 
of tasks. This means if one or more tasks are running slowly in a stage, they 
will be re-launched. |
| spark.speculation.interval | 100 | How often Spark will check for tasks to 
speculate, in milliseconds. |
| spark.speculation.quantile | 0.75 | Percentage of tasks which must be 
complete before speculation is enabled for a particular stage. |
| spark.speculation.multiplier | 1.5 | How many times slower a task is than the 
median to be considered for speculation.
 |

  

 On Thursday, January 15, 2015 5:44 AM, Ajay Srivastava 
 wrote:
   

 Hi,
My spark job is taking long time. I see that some tasks are taking longer time 
for same amount of data and shuffle read/write. What could be the possible 
reasons for it ?
The thread-dump sometimes show that all the tasks in an executor are waiting 
with following stack trace -
"Executor task launch worker-12" daemon prio=10 tid=0x7fcd44276000 
nid=0x3f85 waiting on condition [0x7fcce3ddc000]
   java.lang.Thread.State: WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for  <0x7fd0aee82e00> (a 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
    at java.util.concurrent.locks.LockSupport.park(Unknown Source)
    at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(Unknown
 Source)
    at java.util.concurrent.LinkedBlockingQueue.take(Unknown Source)
    at 
org.apache.spark.storage.BlockFetcherIterator$BasicBlockFetcherIterator.next(BlockFetcherIterator.scala:253)
    at 
org.apache.spark.storage.BlockFetcherIterator$BasicBlockFetcherIterator.next(BlockFetcherIterator.scala:77)
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
    at 
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30)
    at 
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at 
org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:137)
    at 
org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159)
    at 
org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:158)
    at 
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
    at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
    at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:158)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
    at org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
    at org.apache.spark.rdd.FilteredRDD.compute(FilteredRDD.scala:34)
    
Any inputs/suggestions to improve job time will be appreciated.
Regards,Ajay





   

Some tasks are taking long time

2015-01-15 Thread Ajay Srivastava
Hi,
My spark job is taking long time. I see that some tasks are taking longer time 
for same amount of data and shuffle read/write. What could be the possible 
reasons for it ?
The thread-dump sometimes show that all the tasks in an executor are waiting 
with following stack trace -
"Executor task launch worker-12" daemon prio=10 tid=0x7fcd44276000 
nid=0x3f85 waiting on condition [0x7fcce3ddc000]
   java.lang.Thread.State: WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for  <0x7fd0aee82e00> (a 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
    at java.util.concurrent.locks.LockSupport.park(Unknown Source)
    at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(Unknown
 Source)
    at java.util.concurrent.LinkedBlockingQueue.take(Unknown Source)
    at 
org.apache.spark.storage.BlockFetcherIterator$BasicBlockFetcherIterator.next(BlockFetcherIterator.scala:253)
    at 
org.apache.spark.storage.BlockFetcherIterator$BasicBlockFetcherIterator.next(BlockFetcherIterator.scala:77)
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
    at 
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30)
    at 
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at 
org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:137)
    at 
org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159)
    at 
org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:158)
    at 
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
    at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
    at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:158)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
    at org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
    at org.apache.spark.rdd.FilteredRDD.compute(FilteredRDD.scala:34)
    
Any inputs/suggestions to improve job time will be appreciated.
Regards,Ajay



Re: Creating RDD from only few columns of a Parquet file

2015-01-13 Thread Ajay Srivastava
Setting spark.sql.hive.convertMetastoreParquet to true has fixed this.

Regards,Ajay 

 On Tuesday, January 13, 2015 11:50 AM, Ajay Srivastava 
 wrote:
   

 Hi,I am trying to read a parquet file using -val parquetFile = 
sqlContext.parquetFile("people.parquet")

There is no way to specify that I am interested in reading only some columns 
from disk. For example, If the parquet file has 10 columns and want to read 
only 3 columns from disk.

We have done an experiment -
Table1 - Parquet file containing 10 columns
Table2 - Parquet file containing only 3 columns which were used in query 

The time taken by query on table1 and table2 shows huge difference. Query on 
Table1 takes more than double of time taken on table2 which makes me think that 
spark is reading all the columns from disk in case of table1 when it needs only 
3 columns.

How should I make sure that it reads only 3 of 10 columns from disk ?


Regards,
Ajay




Creating RDD from only few columns of a Parquet file

2015-01-12 Thread Ajay Srivastava
Hi,I am trying to read a parquet file using -val parquetFile = 
sqlContext.parquetFile("people.parquet")

There is no way to specify that I am interested in reading only some columns 
from disk. For example, If the parquet file has 10 columns and want to read 
only 3 columns from disk.

We have done an experiment -
Table1 - Parquet file containing 10 columns
Table2 - Parquet file containing only 3 columns which were used in query 

The time taken by query on table1 and table2 shows huge difference. Query on 
Table1 takes more than double of time taken on table2 which makes me think that 
spark is reading all the columns from disk in case of table1 when it needs only 
3 columns.

How should I make sure that it reads only 3 of 10 columns from disk ?


Regards,
Ajay


Spark summit 2014 videos ?

2014-07-10 Thread Ajay Srivastava
Hi,
I did not find any videos on apache spark channel in youtube yet.
Any idea when these will be made available ?


Regards,
Ajay


Re: OFF_HEAP storage level

2014-07-04 Thread Ajay Srivastava
Thanks Jerry.
It looks like a good option, will try it.

Regards,
Ajay



On Friday, July 4, 2014 2:18 PM, "Shao, Saisai"  wrote:
 


Hi Ajay,
 
StorageLevel OFF_HEAP means for can cache your RDD into Tachyon, the 
prerequisite is that you should deploy Tachyon among Spark.
 
Yes, it can alleviate GC, since you offload JVM memory into system managed 
memory.
 
You can use rdd.persist(…) to use this level, details can be checked in 
BlockManager.scala, TachyonBlockManager and TachyonStore.
 
StorageLevel NONE means the rdd will not be cached, and if you want to use this 
rdd again, you should re-compute from the source to get the data.
 
Thanks
Jerry
 
From:Ajay Srivastava [mailto:a_k_srivast...@yahoo.com] 
Sent: Friday, July 04, 2014 2:19 PM
To: user@spark.apache.org
Subject: OFF_HEAP storage level
 
Hi,
I was checking different storage level of an RDD and found OFF_HEAP.
Has anybody used this level ?
If i use this level, where will data be stored ? If not in heap, does it mean 
that we can avoid GC ?
How can I use this level ? I did not find anything in archive regarding this.
Can someone also explain the behavior of storage level - NONE ? 


Regards,
Ajay

OFF_HEAP storage level

2014-07-03 Thread Ajay Srivastava
Hi,
I was checking different storage level of an RDD and found OFF_HEAP.
Has anybody used this level ?
If i use this level, where will data be stored ? If not in heap, does it mean 
that we can avoid GC ?
How can I use this level ? I did not find anything in archive regarding this.
Can someone also explain the behavior of storage level - NONE ? 


Regards,
Ajay


Re: Join : Giving incorrect result

2014-06-06 Thread Ajay Srivastava


Thanks Matei. We have tested the fix and it's working perfectly.

Andrew, we set spark.shuffle.spill=false but the application goes out of 
memory. I think that is expected.

Regards,Ajay 


On Friday, June 6, 2014 3:49 AM, Andrew Ash  wrote:
 


Hi Ajay,

Can you please try running the same code with spark.shuffle.spill=false and see 
if the numbers turn out correctly?  That parameter controls whether or not the 
buggy code that Matei fixed in ExternalAppendOnlyMap is used.

FWIW I saw similar issues in 0.9.0 but no longer in 0.9.1 after I think some 
fixes in spilling landed.

Andrew



On Thu, Jun 5, 2014 at 3:05 PM, Matei Zaharia  wrote:

Hey Ajay, thanks for reporting this. There was indeed a bug, specifically in 
the way join tasks spill to disk (which happened when you had more concurrent 
tasks competing for memory). I’ve posted a patch for it here: 
https://github.com/apache/spark/pull/986. Feel free to try that if you’d like; 
it will also be in 0.9.2 and 1.0.1.
>
>
>Matei
>
>
>On Jun 5, 2014, at 12:19 AM, Ajay Srivastava  wrote:
>
>Sorry for replying late. It was night here.
>>
>>
>>Lian/Matei,
>>Here is the code snippet -
>>    sparkConf.set("spark.executor.memory", "10g")
>>    sparkConf.set("spark.cores.max", "5")
>>    
>>    val sc = new SparkContext(sparkConf)
>>    
>>    val accId2LocRDD = 
>>sc.textFile("hdfs://bbr-dev178:9000/data/subDbSpark/account2location").map(getKeyValueFromString(_,
>> 0, ',', true))
>>  
>>    val accId2DemoRDD = 
>>sc.textFile("hdfs://bbr-dev178:9000/data/subDbSpark/account2demographic_planType").map(getKeyValueFromString(_,
>> 0, ',', true))
>>    
>>    val joinedRDD = accId2LocRDD.join(accId2DemoRDD)
>>
>>
>>  def getKeyValueFromString(line: String, keyIndex: Int, delimit: Char, 
>>retFullLine: Boolean): Tuple2[String, String] = {
>>    val splits = line.split(delimit)
>>    if (splits.length <= 1) {
>>  (null, null)
>>    } else if (retFullLine) {
>>  (splits(keyIndex), line)
>>    } else{
>>        (splits(keyIndex), splits(splits.length-keyIndex-1))
>>    }
>>  }
>>
>>    
>>
>>Both of these files have 10 M records with same unique keys. Size of the file 
>>is nearly 280 MB and block size in hdfs is 256 MB. The output of join should 
>>contain 10 M records.
>>
>>
>>
>>We have done some more experiments -
>>1) Running cogroup instead of join - it also gives incorrect count.
>>2) Running union followed by groupbykey and then filtering records with two 
>>entries in sequence - It also gives incorrect count.
>>3) Increase spark.executor.memory to 50 g and everything works fine. Count 
>>comes 10 M for join,cogroup and union/groupbykey/filter transformations.
>>
>>
>>
>>I thought that 10g is enough memory for executors but even if the memory is 
>>less it should not result in incorrect computation. Probably there is a 
>>problem in reconstructing RDDs when memory is not enough. 
>>
>>
>>
>>Thanks Chen for your observation. I get this problem on single worker so 
>>there will not be any mismatch of jars. On two workers, since executor memory 
>>gets doubled the code works fine.
>>
>>
>>
>>Regards,
>>Ajay
>>
>>
>>
>>
>>On Thursday, June 5, 2014 1:35 AM, Matei Zaharia  
>>wrote:
>> 
>>
>>
>>If this isn’t the problem, it would be great if you can post the code for the 
>>program.
>>
>>
>>Matei
>>
>>
>>
>>On Jun 4, 2014, at 12:58 PM, Xu (Simon) Chen  wrote:
>>
>>Maybe your two workers have different assembly jar files?
>>>I just ran into a similar problem that my spark-shell is using a different 
>>>jar file than my workers - got really confusing results.
>>>On Jun 4, 2014 8:33 AM, "Ajay Srivastava"  wrote:
>>>
>>>Hi,
>>>>
>>>>
>>>>I am doing join of two RDDs which giving different results ( counting 
>>>>number of records ) each time I run this code on same input.
>>>>
>>>>
>>>>The input files are large enough to be divided in two splits. When the 
>>>>program runs on two workers with single core assigned to these, output is 
>>>>consistent and looks correct. But when single worker is used with two or 
>>>>more than two cores, the result seems to be random. Every time, count of 
>>>>joined record is different.
>>>>
>>>>
>>>>Does this sound like a defect or I need to take care of something while 
>>>>using join ? I am using spark-0.9.1.
>>>>
>>>>
>>>>
>>>>Regards
>>>>Ajay
>>
>>
>>
>

Re: Join : Giving incorrect result

2014-06-05 Thread Ajay Srivastava
Sorry for replying late. It was night here.

Lian/Matei,
Here is the code snippet -
    sparkConf.set("spark.executor.memory", "10g")
    sparkConf.set("spark.cores.max", "5")
    
    val sc = new SparkContext(sparkConf)
    
    val accId2LocRDD = 
sc.textFile("hdfs://bbr-dev178:9000/data/subDbSpark/account2location").map(getKeyValueFromString(_,
 0, ',', true))
  
    val accId2DemoRDD = 
sc.textFile("hdfs://bbr-dev178:9000/data/subDbSpark/account2demographic_planType").map(getKeyValueFromString(_,
 0, ',', true))
    
    val joinedRDD = accId2LocRDD.join(accId2DemoRDD)

  def getKeyValueFromString(line: String, keyIndex: Int, delimit: Char, 
retFullLine: Boolean): Tuple2[String, String] = {
    val splits = line.split(delimit)
    if (splits.length <= 1) {
  (null, null)
    } else if (retFullLine) {
  (splits(keyIndex), line)
    } else{
        (splits(keyIndex), splits(splits.length-keyIndex-1))
    }
  }

    

Both of these files have 10 M records with same unique keys. Size of the file 
is nearly 280 MB and block size in hdfs is 256 MB. The output of join should 
contain 10 M records.


We have done some more experiments -
1) Running cogroup instead of join - it also gives incorrect count.
2) Running union followed by groupbykey and then filtering records with two 
entries in sequence - It also gives incorrect count.
3) Increase spark.executor.memory to 50 g and everything works fine. Count 
comes 10 M for join,cogroup and union/groupbykey/filter transformations.


I thought that 10g is enough memory for executors but even if the memory is 
less it should not result in incorrect computation. Probably there is a problem 
in reconstructing RDDs when memory is not enough. 


Thanks Chen for your observation. I get this problem on single worker so there 
will not be any mismatch of jars. On two workers, since executor memory gets 
doubled the code works fine.


Regards,
Ajay



On Thursday, June 5, 2014 1:35 AM, Matei Zaharia  
wrote:
 


If this isn’t the problem, it would be great if you can post the code for the 
program.

Matei



On Jun 4, 2014, at 12:58 PM, Xu (Simon) Chen  wrote:

Maybe your two workers have different assembly jar files?
>I just ran into a similar problem that my spark-shell is using a different jar 
>file than my workers - got really confusing results.
>On Jun 4, 2014 8:33 AM, "Ajay Srivastava"  wrote:
>
>Hi,
>>
>>
>>I am doing join of two RDDs which giving different results ( counting number 
>>of records ) each time I run this code on same input.
>>
>>
>>The input files are large enough to be divided in two splits. When the 
>>program runs on two workers with single core assigned to these, output is 
>>consistent and looks correct. But when single worker is used with two or more 
>>than two cores, the result seems to be random. Every time, count of joined 
>>record is different.
>>
>>
>>Does this sound like a defect or I need to take care of something while using 
>>join ? I am using spark-0.9.1.
>>
>>
>>
>>Regards
>>Ajay

Join : Giving incorrect result

2014-06-04 Thread Ajay Srivastava
Hi,

I am doing join of two RDDs which giving different results ( counting number of 
records ) each time I run this code on same input.

The input files are large enough to be divided in two splits. When the program 
runs on two workers with single core assigned to these, output is consistent 
and looks correct. But when single worker is used with two or more than two 
cores, the result seems to be random. Every time, count of joined record is 
different.

Does this sound like a defect or I need to take care of something while using 
join ? I am using spark-0.9.1.


Regards
Ajay