SPARK-8813 - combining small files in spark sql
Hi, This jira https://issues.apache.org/jira/browse/SPARK-8813 is fixed in spark 2.0.But resolution is not mentioned there. In our use case, there are big as well as many small parquet files which are being queried using spark sql.Can someone please explain what is the fix and how I can use it in spark 2.0 ? I did search commits done in 2.0 branch and looks like I need to use spark.sql.files.openCostInBytes but I am not sure. Regards,Ajay
Re: Data locality across jobs
You can read same partition from every hour's output, union these RDDs and then repartition them as a single partition. This will be done for all partitions one by one. It may not necessarily improve the performance, will depend on size of spills in job when all the data was processed together. Regards,Ajay On Friday, April 3, 2015 2:01 AM, Sandy Ryza wrote: This isn't currently a capability that Spark has, though it has definitely been discussed: https://issues.apache.org/jira/browse/SPARK-1061. The primary obstacle at this point is that Hadoop's FileInputFormat doesn't guarantee that each file corresponds to a single split, so the records corresponding to a particular partition at the end of the first job can end up split across multiple partitions in the second job. -Sandy On Wed, Apr 1, 2015 at 9:09 PM, kjsingh wrote: Hi, We are running an hourly job using Spark 1.2 on Yarn. It saves an RDD of Tuple2. At the end of day, a daily job is launched, which works on the outputs of the hourly jobs. For data locality and speed, we wish that when the daily job launches, it finds all instances of a given key at a single executor rather than fetching it from others during shuffle. Is it possible to maintain key partitioning across jobs? We can control partitioning in one job. But how do we send keys to the executors of same node manager across jobs? And while saving data to HDFS, are the blocks allocated to the same data node machine as the executor for a partition? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Data-locality-across-jobs-tp22351.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Some tasks are taking long time
Thanks Nicos.GC does not contribute much to the execution time of the task. I will debug it further today. Regards,Ajay On Thursday, January 15, 2015 11:55 PM, Nicos wrote: Ajay, Unless we are dealing with some synchronization/conditional variable bug in Spark, try this per tuning guide:Cache Size TuningOne important configuration parameter for GC is the amount of memory that should be used for caching RDDs. By default, Spark uses 60% of the configured executor memory (spark.executor.memory) to cache RDDs. This means that 40% of memory is available for any objects created during task execution.In case your tasks slow down and you find that your JVM is garbage-collecting frequently or running out of memory, lowering this value will help reduce the memory consumption. To change this to, say, 50%, you can call conf.set("spark.storage.memoryFraction", "0.5") on your SparkConf. Combined with the use of serialized caching, using a smaller cache should be sufficient to mitigate most of the garbage collection problems. In case you are interested in further tuning the Java GC, continue reading below. Complete list of tips here:https://spark.apache.org/docs/latest/tuning.html#serialized-rdd-storage Cheers,- Nicos On Jan 15, 2015, at 6:49 AM, Ajay Srivastava wrote: Thanks RK. I can turn on speculative execution but I am trying to find out actual reason for delay as it happens on any node. Any idea about the stack trace in my previous mail. Regards,Ajay On Thursday, January 15, 2015 8:02 PM, RK wrote: If you don't want a few slow tasks to slow down the entire job, you can turn on speculation. Here are the speculation settings from Spark Configuration - Spark 1.2.0 Documentation. | | | | | | | | | Spark Configuration - Spark 1.2.0 DocumentationSpark Configuration Spark Properties Dynamically Loading Spark Properties Viewing Spark Properties Available Properties Application Properties Runtime Environment Shuffle Behavior Spark UI | | | | View on spark.apache.org | Preview by Yahoo | | | | | | spark.speculation | false | If set to "true", performs speculative execution of tasks. This means if one or more tasks are running slowly in a stage, they will be re-launched. | | spark.speculation.interval | 100 | How often Spark will check for tasks to speculate, in milliseconds. | | spark.speculation.quantile | 0.75 | Percentage of tasks which must be complete before speculation is enabled for a particular stage. | | spark.speculation.multiplier | 1.5 | How many times slower a task is than the median to be considered for speculation. | On Thursday, January 15, 2015 5:44 AM, Ajay Srivastava wrote: Hi, My spark job is taking long time. I see that some tasks are taking longer time for same amount of data and shuffle read/write. What could be the possible reasons for it ? The thread-dump sometimes show that all the tasks in an executor are waiting with following stack trace - "Executor task launch worker-12" daemon prio=10 tid=0x7fcd44276000 nid=0x3f85 waiting on condition [0x7fcce3ddc000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x7fd0aee82e00> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at java.util.concurrent.locks.LockSupport.park(Unknown Source) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(Unknown Source) at java.util.concurrent.LinkedBlockingQueue.take(Unknown Source) at org.apache.spark.storage.BlockFetcherIterator$BasicBlockFetcherIterator.next(BlockFetcherIterator.scala:253) at org.apache.spark.storage.BlockFetcherIterator$BasicBlockFetcherIterator.next(BlockFetcherIterator.scala:77) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:137) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:158) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:158) at org.apache.spark.rd
Re: Some tasks are taking long time
Thanks RK. I can turn on speculative execution but I am trying to find out actual reason for delay as it happens on any node. Any idea about the stack trace in my previous mail. Regards,Ajay On Thursday, January 15, 2015 8:02 PM, RK wrote: If you don't want a few slow tasks to slow down the entire job, you can turn on speculation. Here are the speculation settings from Spark Configuration - Spark 1.2.0 Documentation. | | | | | | | | | Spark Configuration - Spark 1.2.0 DocumentationSpark Configuration Spark Properties Dynamically Loading Spark Properties Viewing Spark Properties Available Properties Application Properties Runtime Environment Shuffle Behavior Spark UI | | | | View on spark.apache.org | Preview by Yahoo | | | | | | spark.speculation | false | If set to "true", performs speculative execution of tasks. This means if one or more tasks are running slowly in a stage, they will be re-launched. | | spark.speculation.interval | 100 | How often Spark will check for tasks to speculate, in milliseconds. | | spark.speculation.quantile | 0.75 | Percentage of tasks which must be complete before speculation is enabled for a particular stage. | | spark.speculation.multiplier | 1.5 | How many times slower a task is than the median to be considered for speculation. | On Thursday, January 15, 2015 5:44 AM, Ajay Srivastava wrote: Hi, My spark job is taking long time. I see that some tasks are taking longer time for same amount of data and shuffle read/write. What could be the possible reasons for it ? The thread-dump sometimes show that all the tasks in an executor are waiting with following stack trace - "Executor task launch worker-12" daemon prio=10 tid=0x7fcd44276000 nid=0x3f85 waiting on condition [0x7fcce3ddc000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x7fd0aee82e00> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at java.util.concurrent.locks.LockSupport.park(Unknown Source) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(Unknown Source) at java.util.concurrent.LinkedBlockingQueue.take(Unknown Source) at org.apache.spark.storage.BlockFetcherIterator$BasicBlockFetcherIterator.next(BlockFetcherIterator.scala:253) at org.apache.spark.storage.BlockFetcherIterator$BasicBlockFetcherIterator.next(BlockFetcherIterator.scala:77) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:137) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:158) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:158) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.rdd.FilteredRDD.compute(FilteredRDD.scala:34) Any inputs/suggestions to improve job time will be appreciated. Regards,Ajay
Some tasks are taking long time
Hi, My spark job is taking long time. I see that some tasks are taking longer time for same amount of data and shuffle read/write. What could be the possible reasons for it ? The thread-dump sometimes show that all the tasks in an executor are waiting with following stack trace - "Executor task launch worker-12" daemon prio=10 tid=0x7fcd44276000 nid=0x3f85 waiting on condition [0x7fcce3ddc000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x7fd0aee82e00> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at java.util.concurrent.locks.LockSupport.park(Unknown Source) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(Unknown Source) at java.util.concurrent.LinkedBlockingQueue.take(Unknown Source) at org.apache.spark.storage.BlockFetcherIterator$BasicBlockFetcherIterator.next(BlockFetcherIterator.scala:253) at org.apache.spark.storage.BlockFetcherIterator$BasicBlockFetcherIterator.next(BlockFetcherIterator.scala:77) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:137) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:158) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:158) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.rdd.FilteredRDD.compute(FilteredRDD.scala:34) Any inputs/suggestions to improve job time will be appreciated. Regards,Ajay
Re: Creating RDD from only few columns of a Parquet file
Setting spark.sql.hive.convertMetastoreParquet to true has fixed this. Regards,Ajay On Tuesday, January 13, 2015 11:50 AM, Ajay Srivastava wrote: Hi,I am trying to read a parquet file using -val parquetFile = sqlContext.parquetFile("people.parquet") There is no way to specify that I am interested in reading only some columns from disk. For example, If the parquet file has 10 columns and want to read only 3 columns from disk. We have done an experiment - Table1 - Parquet file containing 10 columns Table2 - Parquet file containing only 3 columns which were used in query The time taken by query on table1 and table2 shows huge difference. Query on Table1 takes more than double of time taken on table2 which makes me think that spark is reading all the columns from disk in case of table1 when it needs only 3 columns. How should I make sure that it reads only 3 of 10 columns from disk ? Regards, Ajay
Creating RDD from only few columns of a Parquet file
Hi,I am trying to read a parquet file using -val parquetFile = sqlContext.parquetFile("people.parquet") There is no way to specify that I am interested in reading only some columns from disk. For example, If the parquet file has 10 columns and want to read only 3 columns from disk. We have done an experiment - Table1 - Parquet file containing 10 columns Table2 - Parquet file containing only 3 columns which were used in query The time taken by query on table1 and table2 shows huge difference. Query on Table1 takes more than double of time taken on table2 which makes me think that spark is reading all the columns from disk in case of table1 when it needs only 3 columns. How should I make sure that it reads only 3 of 10 columns from disk ? Regards, Ajay
Spark summit 2014 videos ?
Hi, I did not find any videos on apache spark channel in youtube yet. Any idea when these will be made available ? Regards, Ajay
Re: OFF_HEAP storage level
Thanks Jerry. It looks like a good option, will try it. Regards, Ajay On Friday, July 4, 2014 2:18 PM, "Shao, Saisai" wrote: Hi Ajay, StorageLevel OFF_HEAP means for can cache your RDD into Tachyon, the prerequisite is that you should deploy Tachyon among Spark. Yes, it can alleviate GC, since you offload JVM memory into system managed memory. You can use rdd.persist(…) to use this level, details can be checked in BlockManager.scala, TachyonBlockManager and TachyonStore. StorageLevel NONE means the rdd will not be cached, and if you want to use this rdd again, you should re-compute from the source to get the data. Thanks Jerry From:Ajay Srivastava [mailto:a_k_srivast...@yahoo.com] Sent: Friday, July 04, 2014 2:19 PM To: user@spark.apache.org Subject: OFF_HEAP storage level Hi, I was checking different storage level of an RDD and found OFF_HEAP. Has anybody used this level ? If i use this level, where will data be stored ? If not in heap, does it mean that we can avoid GC ? How can I use this level ? I did not find anything in archive regarding this. Can someone also explain the behavior of storage level - NONE ? Regards, Ajay
OFF_HEAP storage level
Hi, I was checking different storage level of an RDD and found OFF_HEAP. Has anybody used this level ? If i use this level, where will data be stored ? If not in heap, does it mean that we can avoid GC ? How can I use this level ? I did not find anything in archive regarding this. Can someone also explain the behavior of storage level - NONE ? Regards, Ajay
Re: Join : Giving incorrect result
Thanks Matei. We have tested the fix and it's working perfectly. Andrew, we set spark.shuffle.spill=false but the application goes out of memory. I think that is expected. Regards,Ajay On Friday, June 6, 2014 3:49 AM, Andrew Ash wrote: Hi Ajay, Can you please try running the same code with spark.shuffle.spill=false and see if the numbers turn out correctly? That parameter controls whether or not the buggy code that Matei fixed in ExternalAppendOnlyMap is used. FWIW I saw similar issues in 0.9.0 but no longer in 0.9.1 after I think some fixes in spilling landed. Andrew On Thu, Jun 5, 2014 at 3:05 PM, Matei Zaharia wrote: Hey Ajay, thanks for reporting this. There was indeed a bug, specifically in the way join tasks spill to disk (which happened when you had more concurrent tasks competing for memory). I’ve posted a patch for it here: https://github.com/apache/spark/pull/986. Feel free to try that if you’d like; it will also be in 0.9.2 and 1.0.1. > > >Matei > > >On Jun 5, 2014, at 12:19 AM, Ajay Srivastava wrote: > >Sorry for replying late. It was night here. >> >> >>Lian/Matei, >>Here is the code snippet - >> sparkConf.set("spark.executor.memory", "10g") >> sparkConf.set("spark.cores.max", "5") >> >> val sc = new SparkContext(sparkConf) >> >> val accId2LocRDD = >>sc.textFile("hdfs://bbr-dev178:9000/data/subDbSpark/account2location").map(getKeyValueFromString(_, >> 0, ',', true)) >> >> val accId2DemoRDD = >>sc.textFile("hdfs://bbr-dev178:9000/data/subDbSpark/account2demographic_planType").map(getKeyValueFromString(_, >> 0, ',', true)) >> >> val joinedRDD = accId2LocRDD.join(accId2DemoRDD) >> >> >> def getKeyValueFromString(line: String, keyIndex: Int, delimit: Char, >>retFullLine: Boolean): Tuple2[String, String] = { >> val splits = line.split(delimit) >> if (splits.length <= 1) { >> (null, null) >> } else if (retFullLine) { >> (splits(keyIndex), line) >> } else{ >> (splits(keyIndex), splits(splits.length-keyIndex-1)) >> } >> } >> >> >> >>Both of these files have 10 M records with same unique keys. Size of the file >>is nearly 280 MB and block size in hdfs is 256 MB. The output of join should >>contain 10 M records. >> >> >> >>We have done some more experiments - >>1) Running cogroup instead of join - it also gives incorrect count. >>2) Running union followed by groupbykey and then filtering records with two >>entries in sequence - It also gives incorrect count. >>3) Increase spark.executor.memory to 50 g and everything works fine. Count >>comes 10 M for join,cogroup and union/groupbykey/filter transformations. >> >> >> >>I thought that 10g is enough memory for executors but even if the memory is >>less it should not result in incorrect computation. Probably there is a >>problem in reconstructing RDDs when memory is not enough. >> >> >> >>Thanks Chen for your observation. I get this problem on single worker so >>there will not be any mismatch of jars. On two workers, since executor memory >>gets doubled the code works fine. >> >> >> >>Regards, >>Ajay >> >> >> >> >>On Thursday, June 5, 2014 1:35 AM, Matei Zaharia >>wrote: >> >> >> >>If this isn’t the problem, it would be great if you can post the code for the >>program. >> >> >>Matei >> >> >> >>On Jun 4, 2014, at 12:58 PM, Xu (Simon) Chen wrote: >> >>Maybe your two workers have different assembly jar files? >>>I just ran into a similar problem that my spark-shell is using a different >>>jar file than my workers - got really confusing results. >>>On Jun 4, 2014 8:33 AM, "Ajay Srivastava" wrote: >>> >>>Hi, >>>> >>>> >>>>I am doing join of two RDDs which giving different results ( counting >>>>number of records ) each time I run this code on same input. >>>> >>>> >>>>The input files are large enough to be divided in two splits. When the >>>>program runs on two workers with single core assigned to these, output is >>>>consistent and looks correct. But when single worker is used with two or >>>>more than two cores, the result seems to be random. Every time, count of >>>>joined record is different. >>>> >>>> >>>>Does this sound like a defect or I need to take care of something while >>>>using join ? I am using spark-0.9.1. >>>> >>>> >>>> >>>>Regards >>>>Ajay >> >> >> >
Re: Join : Giving incorrect result
Sorry for replying late. It was night here. Lian/Matei, Here is the code snippet - sparkConf.set("spark.executor.memory", "10g") sparkConf.set("spark.cores.max", "5") val sc = new SparkContext(sparkConf) val accId2LocRDD = sc.textFile("hdfs://bbr-dev178:9000/data/subDbSpark/account2location").map(getKeyValueFromString(_, 0, ',', true)) val accId2DemoRDD = sc.textFile("hdfs://bbr-dev178:9000/data/subDbSpark/account2demographic_planType").map(getKeyValueFromString(_, 0, ',', true)) val joinedRDD = accId2LocRDD.join(accId2DemoRDD) def getKeyValueFromString(line: String, keyIndex: Int, delimit: Char, retFullLine: Boolean): Tuple2[String, String] = { val splits = line.split(delimit) if (splits.length <= 1) { (null, null) } else if (retFullLine) { (splits(keyIndex), line) } else{ (splits(keyIndex), splits(splits.length-keyIndex-1)) } } Both of these files have 10 M records with same unique keys. Size of the file is nearly 280 MB and block size in hdfs is 256 MB. The output of join should contain 10 M records. We have done some more experiments - 1) Running cogroup instead of join - it also gives incorrect count. 2) Running union followed by groupbykey and then filtering records with two entries in sequence - It also gives incorrect count. 3) Increase spark.executor.memory to 50 g and everything works fine. Count comes 10 M for join,cogroup and union/groupbykey/filter transformations. I thought that 10g is enough memory for executors but even if the memory is less it should not result in incorrect computation. Probably there is a problem in reconstructing RDDs when memory is not enough. Thanks Chen for your observation. I get this problem on single worker so there will not be any mismatch of jars. On two workers, since executor memory gets doubled the code works fine. Regards, Ajay On Thursday, June 5, 2014 1:35 AM, Matei Zaharia wrote: If this isn’t the problem, it would be great if you can post the code for the program. Matei On Jun 4, 2014, at 12:58 PM, Xu (Simon) Chen wrote: Maybe your two workers have different assembly jar files? >I just ran into a similar problem that my spark-shell is using a different jar >file than my workers - got really confusing results. >On Jun 4, 2014 8:33 AM, "Ajay Srivastava" wrote: > >Hi, >> >> >>I am doing join of two RDDs which giving different results ( counting number >>of records ) each time I run this code on same input. >> >> >>The input files are large enough to be divided in two splits. When the >>program runs on two workers with single core assigned to these, output is >>consistent and looks correct. But when single worker is used with two or more >>than two cores, the result seems to be random. Every time, count of joined >>record is different. >> >> >>Does this sound like a defect or I need to take care of something while using >>join ? I am using spark-0.9.1. >> >> >> >>Regards >>Ajay
Join : Giving incorrect result
Hi, I am doing join of two RDDs which giving different results ( counting number of records ) each time I run this code on same input. The input files are large enough to be divided in two splits. When the program runs on two workers with single core assigned to these, output is consistent and looks correct. But when single worker is used with two or more than two cores, the result seems to be random. Every time, count of joined record is different. Does this sound like a defect or I need to take care of something while using join ? I am using spark-0.9.1. Regards Ajay