Re: Java 8 vs Scala
If you takes time to actually learn Scala starting from its fundamental concepts AND quite importantly get familiar with general functional programming concepts, you'd immediately realize the things that you'd really miss going back to Java (8). On Fri, Jul 17, 2015 at 8:14 AM Wojciech Pituła w.pit...@gmail.com wrote: IMHO only Scala is an option. Once you're familiar with it you just cant even look at java code. czw., 16.07.2015 o 07:20 użytkownik spark user spark_u...@yahoo.com.invalid napisał: I struggle lots in Scala , almost 10 days n0 improvement , but when i switch to Java 8 , things are so smooth , and I used Data Frame with Redshift and Hive all are looking good . if you are very good In Scala the go with Scala otherwise Java is best fit . This is just my openion because I am Java guy. On Wednesday, July 15, 2015 12:33 PM, vaquar khan vaquar.k...@gmail.com wrote: My choice is java 8 On 15 Jul 2015 18:03, Alan Burlison alan.burli...@oracle.com wrote: On 15/07/2015 08:31, Ignacio Blasco wrote: The main advantage of using scala vs java 8 is being able to use a console https://bugs.openjdk.java.net/browse/JDK-8043364 -- Alan Burlison -- - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
DataFrame from RDD[Row]
Hi, This is an ugly solution because it requires pulling out a row: val rdd: RDD[Row] = ... ctx.createDataFrame(rdd, rdd.first().schema) Is there a better alternative to get a DataFrame from an RDD[Row] since toDF won't work as Row is not a Product ? Thanks, Marius
Re: Optimizations
Thanks for your feedback. Yes I am aware of stages design and Silvio what you are describing is essentially map-side join which is not applicable when you have both RDDs quite large. It appears that rdd.join(...).mapToPair(f) f is piggybacked inside join stage (right in the reducers I believe) whereas rdd.join(...).mapPartitionToPair( f ) f is executed in a different stage. This is surprising because at least intuitively the difference between mapToPair and mapPartitionToPair is that that former is about the push model whereas the latter is about polling records out of the iterator (*I suspect there are other technical reasons*). If anyone know the depths of the problem if would be of great help. Best, Marius On Fri, Jul 3, 2015 at 6:43 PM Silvio Fiorito silvio.fior...@granturing.com wrote: One thing you could do is a broadcast join. You take your smaller RDD, save it as a broadcast variable. Then run a map operation to perform the join and whatever else you need to do. This will remove a shuffle stage but you will still have to collect the joined RDD and broadcast it. All depends on the size of your data if it’s worth it or not. From: Marius Danciu Date: Friday, July 3, 2015 at 3:13 AM To: user Subject: Optimizations Hi all, If I have something like: rdd.join(...).mapPartitionToPair(...) It looks like mapPartitionToPair runs in a different stage then join. Is there a way to piggyback this computation inside the join stage ? ... such that each result partition after join is passed to the mapPartitionToPair function, all running in the same state without any other costs. Best, Marius
Optimizations
Hi all, If I have something like: rdd.join(...).mapPartitionToPair(...) It looks like mapPartitionToPair runs in a different stage then join. Is there a way to piggyback this computation inside the join stage ? ... such that each result partition after join is passed to the mapPartitionToPair function, all running in the same state without any other costs. Best, Marius
Re: Spark partitioning question
Turned out that is was sufficient do to repartitionAndSortWithinPartitions ... so far so good ;) On Tue, May 5, 2015 at 9:45 AM Marius Danciu marius.dan...@gmail.com wrote: Hi Imran, Yes that's what MyPartitioner does. I do see (using traces from MyPartitioner) that the key is partitioned on partition 0 but then I see this record arriving in both Yarn containers (I see it in the logs). Basically I need to emulate a Hadoop map-reduce job in Spark and groupByKey seemed a natural fit ( ... I am aware of its limitations). Thanks, Marius On Mon, May 4, 2015 at 10:45 PM Imran Rashid iras...@cloudera.com wrote: Hi Marius, I am also a little confused -- are you saying that myPartitions is basically something like: class MyPartitioner extends Partitioner { def numPartitions = 1 def getPartition(key: Any) = 0 } ?? If so, I don't understand how you'd ever end up data in two partitions. Indeed, than everything before the call to partitionBy(myPartitioner) is somewhat irrelevant. The important point is the partitionsBy should put all the data in one partition, and then the operations after that do not move data between partitions. so if you're really observing data in two partitions, then it would good to know more about what version of spark you are on, your data etc. as it sounds like a bug. But, I have a feeling there is some misunderstanding about what your partitioner is doing. Eg., I think doing groupByKey followed by sortByKey doesn't make a lot of sense -- in general one sortByKey is all you need (its not exactly the same, but most probably close enough, and avoids doing another expensive shuffle). If you can share a bit more information on your partitioner, and what properties you need for your f, that might help. thanks, Imran On Tue, Apr 28, 2015 at 7:10 AM, Marius Danciu marius.dan...@gmail.com wrote: Hello all, I have the following Spark (pseudo)code: rdd = mapPartitionsWithIndex(...) .mapPartitionsToPair(...) .groupByKey() .sortByKey(comparator) .partitionBy(myPartitioner) .mapPartitionsWithIndex(...) .mapPartitionsToPair( *f* ) The input data has 2 input splits (yarn 2.6.0). myPartitioner partitions all the records on partition 0, which is correct, so the intuition is that f provided to the last transformation (mapPartitionsToPair) would run sequentially inside a single yarn container. However from yarn logs I do see that both yarn containers are processing records from the same partition ... and *sometimes* the over all job fails (due to the code in f which expects a certain order of records) and yarn container 1 receives the records as expected, whereas yarn container 2 receives a subset of records ... for a reason I cannot explain and f fails. The overall behavior of this job is that sometimes it succeeds and sometimes it fails ... apparently due to inconsistent propagation of sorted records to yarn containers. If any of this makes any sense to you, please let me know what I am missing. Best, Marius
Re: Spark partitioning question
Hi Imran, Yes that's what MyPartitioner does. I do see (using traces from MyPartitioner) that the key is partitioned on partition 0 but then I see this record arriving in both Yarn containers (I see it in the logs). Basically I need to emulate a Hadoop map-reduce job in Spark and groupByKey seemed a natural fit ( ... I am aware of its limitations). Thanks, Marius On Mon, May 4, 2015 at 10:45 PM Imran Rashid iras...@cloudera.com wrote: Hi Marius, I am also a little confused -- are you saying that myPartitions is basically something like: class MyPartitioner extends Partitioner { def numPartitions = 1 def getPartition(key: Any) = 0 } ?? If so, I don't understand how you'd ever end up data in two partitions. Indeed, than everything before the call to partitionBy(myPartitioner) is somewhat irrelevant. The important point is the partitionsBy should put all the data in one partition, and then the operations after that do not move data between partitions. so if you're really observing data in two partitions, then it would good to know more about what version of spark you are on, your data etc. as it sounds like a bug. But, I have a feeling there is some misunderstanding about what your partitioner is doing. Eg., I think doing groupByKey followed by sortByKey doesn't make a lot of sense -- in general one sortByKey is all you need (its not exactly the same, but most probably close enough, and avoids doing another expensive shuffle). If you can share a bit more information on your partitioner, and what properties you need for your f, that might help. thanks, Imran On Tue, Apr 28, 2015 at 7:10 AM, Marius Danciu marius.dan...@gmail.com wrote: Hello all, I have the following Spark (pseudo)code: rdd = mapPartitionsWithIndex(...) .mapPartitionsToPair(...) .groupByKey() .sortByKey(comparator) .partitionBy(myPartitioner) .mapPartitionsWithIndex(...) .mapPartitionsToPair( *f* ) The input data has 2 input splits (yarn 2.6.0). myPartitioner partitions all the records on partition 0, which is correct, so the intuition is that f provided to the last transformation (mapPartitionsToPair) would run sequentially inside a single yarn container. However from yarn logs I do see that both yarn containers are processing records from the same partition ... and *sometimes* the over all job fails (due to the code in f which expects a certain order of records) and yarn container 1 receives the records as expected, whereas yarn container 2 receives a subset of records ... for a reason I cannot explain and f fails. The overall behavior of this job is that sometimes it succeeds and sometimes it fails ... apparently due to inconsistent propagation of sorted records to yarn containers. If any of this makes any sense to you, please let me know what I am missing. Best, Marius
Re: Spark partitioning question
Thank you Silvio, I am aware of groubBy limitations and this is subject for replacement. I did try repartitionAndSortWithinPartitions but then I end up with maybe too much shuffling one from groupByKey and the other from repartition. My expectation was that since N records are partitioned to the same partition ...say 0, doing a mapPartition on the resulting RDD would place all records for partition 0 into a single on a single node. Seems to me that this is not quite the case since N can span to multiple HDFS blocks and subsequent mapPartition operation would be paralelized on multiple nodes. In my case I see 2 yarn containers receiving records during a mapPartition operation applied on the sorted partition. I need to test more but it seems that applying the same partitioner again right before the last mapPartition can help. Best, Marius On Tue, Apr 28, 2015 at 4:40 PM Silvio Fiorito silvio.fior...@granturing.com wrote: Hi Marius, What’s the expected output? I would recommend avoiding the groupByKey if possible since it’s going to force all records for each key to go to an executor which may overload it. Also if you need to sort and repartition, try using repartitionAndSortWithinPartitions to do it in one shot. Thanks, Silvio From: Marius Danciu Date: Tuesday, April 28, 2015 at 8:10 AM To: user Subject: Spark partitioning question Hello all, I have the following Spark (pseudo)code: rdd = mapPartitionsWithIndex(...) .mapPartitionsToPair(...) .groupByKey() .sortByKey(comparator) .partitionBy(myPartitioner) .mapPartitionsWithIndex(...) .mapPartitionsToPair( *f* ) The input data has 2 input splits (yarn 2.6.0). myPartitioner partitions all the records on partition 0, which is correct, so the intuition is that f provided to the last transformation (mapPartitionsToPair) would run sequentially inside a single yarn container. However from yarn logs I do see that both yarn containers are processing records from the same partition ... and *sometimes* the over all job fails (due to the code in f which expects a certain order of records) and yarn container 1 receives the records as expected, whereas yarn container 2 receives a subset of records ... for a reason I cannot explain and f fails. The overall behavior of this job is that sometimes it succeeds and sometimes it fails ... apparently due to inconsistent propagation of sorted records to yarn containers. If any of this makes any sense to you, please let me know what I am missing. Best, Marius
Spark partitioning question
Hello all, I have the following Spark (pseudo)code: rdd = mapPartitionsWithIndex(...) .mapPartitionsToPair(...) .groupByKey() .sortByKey(comparator) .partitionBy(myPartitioner) .mapPartitionsWithIndex(...) .mapPartitionsToPair( *f* ) The input data has 2 input splits (yarn 2.6.0). myPartitioner partitions all the records on partition 0, which is correct, so the intuition is that f provided to the last transformation (mapPartitionsToPair) would run sequentially inside a single yarn container. However from yarn logs I do see that both yarn containers are processing records from the same partition ... and *sometimes* the over all job fails (due to the code in f which expects a certain order of records) and yarn container 1 receives the records as expected, whereas yarn container 2 receives a subset of records ... for a reason I cannot explain and f fails. The overall behavior of this job is that sometimes it succeeds and sometimes it fails ... apparently due to inconsistent propagation of sorted records to yarn containers. If any of this makes any sense to you, please let me know what I am missing. Best, Marius
Re: Shuffle question
Anyone ? On Tue, Apr 21, 2015 at 3:38 PM Marius Danciu marius.dan...@gmail.com wrote: Hello anyone, I have a question regarding the sort shuffle. Roughly I'm doing something like: rdd.mapPartitionsToPair(f1).groupByKey().mapPartitionsToPair(f2) The problem is that in f2 I don't see the keys being sorted. The keys are Java Comparable not scala.math.Ordered or scala.math.Ordering (it would be weird for each key to implement Ordering as mentioned in the JIRA item https://issues.apache.org/jira/browse/SPARK-2045) Questions: 1. Do I need to explicitly sortByKey ? (if I do this I can see the keys correctly sorted in f2) ... but I'm worried about the extra costs since Spark 1.3.0 is supposed to use the SORT shuffle manager by default, right ? 2. Do I need each key to be an scala.math.Ordered ? ... is Java Comparable used at all ? ... btw I'm using Spark from Java ... don't ask me why :) Best, Marius