Re: Java 8 vs Scala

2015-07-17 Thread Marius Danciu
If you takes time to actually learn Scala starting from its fundamental
concepts AND quite importantly get familiar with general  functional
programming concepts, you'd immediately realize the things that you'd
really miss going back to Java (8).

On Fri, Jul 17, 2015 at 8:14 AM Wojciech Pituła w.pit...@gmail.com wrote:

 IMHO only Scala is an option. Once you're familiar with it you just cant
 even look at java code.

 czw., 16.07.2015 o 07:20 użytkownik spark user
 spark_u...@yahoo.com.invalid napisał:

 I struggle lots in Scala , almost 10 days n0 improvement , but when i
 switch to Java 8 , things are so smooth , and I used Data Frame with
 Redshift and Hive all are looking good .
 if you are very good In Scala the go with Scala otherwise Java is best
 fit  .

 This is just my openion because I am Java guy.



   On Wednesday, July 15, 2015 12:33 PM, vaquar khan 
 vaquar.k...@gmail.com wrote:


 My choice is java 8
 On 15 Jul 2015 18:03, Alan Burlison alan.burli...@oracle.com wrote:

 On 15/07/2015 08:31, Ignacio Blasco wrote:

  The main advantage of using scala vs java 8 is being able to use a
 console


 https://bugs.openjdk.java.net/browse/JDK-8043364

 --
 Alan Burlison
 --

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org






DataFrame from RDD[Row]

2015-07-16 Thread Marius Danciu
Hi,

This is an ugly solution because it requires pulling out a row:

val rdd: RDD[Row] = ...
ctx.createDataFrame(rdd, rdd.first().schema)

Is there a better alternative to get a DataFrame from an RDD[Row] since
toDF won't work as Row is not a Product ?


Thanks,
Marius


Re: Optimizations

2015-07-03 Thread Marius Danciu
Thanks for your feedback. Yes I am aware of stages design and Silvio what
you are describing is essentially map-side join which is not applicable
when you have both RDDs quite large.

It appears that

rdd.join(...).mapToPair(f)
f is piggybacked inside join stage  (right in the reducers I believe)

whereas

rdd.join(...).mapPartitionToPair( f )

f is executed in a different stage. This is surprising because at least
intuitively the difference between mapToPair and mapPartitionToPair is that
that former is about the push model whereas the latter is about polling
records out of the iterator (*I suspect there are other technical reasons*).
If anyone know the depths of the problem if would be of great help.

Best,
Marius

On Fri, Jul 3, 2015 at 6:43 PM Silvio Fiorito silvio.fior...@granturing.com
wrote:

   One thing you could do is a broadcast join. You take your smaller RDD,
 save it as a broadcast variable. Then run a map operation to perform the
 join and whatever else you need to do. This will remove a shuffle stage but
 you will still have to collect the joined RDD and broadcast it. All depends
 on the size of your data if it’s worth it or not.

   From: Marius Danciu
 Date: Friday, July 3, 2015 at 3:13 AM
 To: user
 Subject: Optimizations

   Hi all,

  If I have something like:

  rdd.join(...).mapPartitionToPair(...)

  It looks like mapPartitionToPair runs in a different stage then join. Is
 there a way to piggyback this computation inside the join stage ? ... such
 that each result partition after join is passed to
 the mapPartitionToPair function, all running in the same state without any
 other costs.

  Best,
 Marius



Optimizations

2015-07-03 Thread Marius Danciu
Hi all,

If I have something like:

rdd.join(...).mapPartitionToPair(...)

It looks like mapPartitionToPair runs in a different stage then join. Is
there a way to piggyback this computation inside the join stage ? ... such
that each result partition after join is passed to
the mapPartitionToPair function, all running in the same state without any
other costs.

Best,
Marius


Re: Spark partitioning question

2015-05-05 Thread Marius Danciu
Turned out that is was sufficient do to repartitionAndSortWithinPartitions
... so far so good ;)

On Tue, May 5, 2015 at 9:45 AM Marius Danciu marius.dan...@gmail.com
wrote:

 Hi Imran,

 Yes that's what MyPartitioner does. I do see (using traces from
 MyPartitioner) that the key is partitioned on partition 0 but then I see
 this record arriving in both Yarn containers (I see it in the logs).
 Basically I need to emulate a Hadoop map-reduce job in Spark and groupByKey
 seemed a natural fit ( ... I am aware of its limitations).

 Thanks,
 Marius

 On Mon, May 4, 2015 at 10:45 PM Imran Rashid iras...@cloudera.com wrote:

 Hi Marius,

 I am also a little confused -- are you saying that myPartitions is
 basically something like:

 class MyPartitioner extends Partitioner {
   def numPartitions = 1
   def getPartition(key: Any) = 0
 }

 ??

 If so, I don't understand how you'd ever end up data in two partitions.
 Indeed, than everything before the call to partitionBy(myPartitioner) is
 somewhat irrelevant.  The important point is the partitionsBy should put
 all the data in one partition, and then the operations after that do not
 move data between partitions.  so if you're really observing data in two
 partitions, then it would good to know more about what version of spark you
 are on, your data etc. as it sounds like a bug.

 But, I have a feeling there is some misunderstanding about what your
 partitioner is doing.  Eg., I think doing groupByKey followed by sortByKey
 doesn't make a lot of sense -- in general one sortByKey is all you need
 (its not exactly the same, but most probably close enough, and avoids doing
 another expensive shuffle).  If you can share a bit more information on
 your partitioner, and what properties you need for your f, that might
 help.

 thanks,
 Imran


 On Tue, Apr 28, 2015 at 7:10 AM, Marius Danciu marius.dan...@gmail.com
 wrote:

 Hello all,

 I have the following Spark (pseudo)code:

 rdd = mapPartitionsWithIndex(...)
 .mapPartitionsToPair(...)
 .groupByKey()
 .sortByKey(comparator)
 .partitionBy(myPartitioner)
 .mapPartitionsWithIndex(...)
 .mapPartitionsToPair( *f* )

 The input data has 2 input splits (yarn 2.6.0).
 myPartitioner partitions all the records on partition 0, which is
 correct, so the intuition is that f provided to the last transformation
 (mapPartitionsToPair) would run sequentially inside a single yarn
 container. However from yarn logs I do see that both yarn containers are
 processing records from the same partition ... and *sometimes*  the
 over all job fails (due to the code in f which expects a certain order of
 records) and yarn container 1 receives the records as expected, whereas
 yarn container 2 receives a subset of records ... for a reason I cannot
 explain and f fails.

 The overall behavior of this job is that sometimes it succeeds and
 sometimes it fails ... apparently due to inconsistent propagation of sorted
 records to yarn containers.


 If any of this makes any sense to you, please let me know what I am
 missing.



 Best,
 Marius





Re: Spark partitioning question

2015-05-05 Thread Marius Danciu
Hi Imran,

Yes that's what MyPartitioner does. I do see (using traces from
MyPartitioner) that the key is partitioned on partition 0 but then I see
this record arriving in both Yarn containers (I see it in the logs).
Basically I need to emulate a Hadoop map-reduce job in Spark and groupByKey
seemed a natural fit ( ... I am aware of its limitations).

Thanks,
Marius

On Mon, May 4, 2015 at 10:45 PM Imran Rashid iras...@cloudera.com wrote:

 Hi Marius,

 I am also a little confused -- are you saying that myPartitions is
 basically something like:

 class MyPartitioner extends Partitioner {
   def numPartitions = 1
   def getPartition(key: Any) = 0
 }

 ??

 If so, I don't understand how you'd ever end up data in two partitions.
 Indeed, than everything before the call to partitionBy(myPartitioner) is
 somewhat irrelevant.  The important point is the partitionsBy should put
 all the data in one partition, and then the operations after that do not
 move data between partitions.  so if you're really observing data in two
 partitions, then it would good to know more about what version of spark you
 are on, your data etc. as it sounds like a bug.

 But, I have a feeling there is some misunderstanding about what your
 partitioner is doing.  Eg., I think doing groupByKey followed by sortByKey
 doesn't make a lot of sense -- in general one sortByKey is all you need
 (its not exactly the same, but most probably close enough, and avoids doing
 another expensive shuffle).  If you can share a bit more information on
 your partitioner, and what properties you need for your f, that might
 help.

 thanks,
 Imran


 On Tue, Apr 28, 2015 at 7:10 AM, Marius Danciu marius.dan...@gmail.com
 wrote:

 Hello all,

 I have the following Spark (pseudo)code:

 rdd = mapPartitionsWithIndex(...)
 .mapPartitionsToPair(...)
 .groupByKey()
 .sortByKey(comparator)
 .partitionBy(myPartitioner)
 .mapPartitionsWithIndex(...)
 .mapPartitionsToPair( *f* )

 The input data has 2 input splits (yarn 2.6.0).
 myPartitioner partitions all the records on partition 0, which is
 correct, so the intuition is that f provided to the last transformation
 (mapPartitionsToPair) would run sequentially inside a single yarn
 container. However from yarn logs I do see that both yarn containers are
 processing records from the same partition ... and *sometimes*  the over
 all job fails (due to the code in f which expects a certain order of
 records) and yarn container 1 receives the records as expected, whereas
 yarn container 2 receives a subset of records ... for a reason I cannot
 explain and f fails.

 The overall behavior of this job is that sometimes it succeeds and
 sometimes it fails ... apparently due to inconsistent propagation of sorted
 records to yarn containers.


 If any of this makes any sense to you, please let me know what I am
 missing.



 Best,
 Marius





Re: Spark partitioning question

2015-04-28 Thread Marius Danciu
Thank you Silvio,

I am aware of groubBy limitations and this is subject for replacement.

I did try repartitionAndSortWithinPartitions but then I end up with maybe
too much shuffling one from groupByKey and the other from repartition.

My expectation was that since N records are partitioned to the same
partition ...say 0, doing a mapPartition on the resulting RDD would place
all records for partition 0 into a single on a single node. Seems to me
that this is not quite the case since N can span to multiple HDFS blocks
and subsequent mapPartition operation would be paralelized on multiple
nodes. In my case I see 2 yarn containers receiving records during a
mapPartition operation applied on the sorted partition. I need to test more
but it seems that applying the same partitioner again right before the
last mapPartition can
help.

Best,
Marius

On Tue, Apr 28, 2015 at 4:40 PM Silvio Fiorito 
silvio.fior...@granturing.com wrote:

   Hi Marius,

  What’s the expected output?

  I would recommend avoiding the groupByKey if possible since it’s going
 to force all records for each key to go to an executor which may overload
 it.

  Also if you need to sort and repartition, try using
 repartitionAndSortWithinPartitions to do it in one shot.

  Thanks,
 Silvio

   From: Marius Danciu
 Date: Tuesday, April 28, 2015 at 8:10 AM
 To: user
 Subject: Spark partitioning question

   Hello all,

  I have the following Spark (pseudo)code:

  rdd = mapPartitionsWithIndex(...)
 .mapPartitionsToPair(...)
 .groupByKey()
 .sortByKey(comparator)
 .partitionBy(myPartitioner)
  .mapPartitionsWithIndex(...)
 .mapPartitionsToPair( *f* )

  The input data has 2 input splits (yarn 2.6.0).
 myPartitioner partitions all the records on partition 0, which is correct,
 so the intuition is that f provided to the last transformation
 (mapPartitionsToPair) would run sequentially inside a single yarn
 container. However from yarn logs I do see that both yarn containers are
 processing records from the same partition ... and *sometimes*  the over
 all job fails (due to the code in f which expects a certain order of
 records) and yarn container 1 receives the records as expected, whereas
 yarn container 2 receives a subset of records ... for a reason I cannot
 explain and f fails.

  The overall behavior of this job is that sometimes it succeeds and
 sometimes it fails ... apparently due to inconsistent propagation of sorted
 records to yarn containers.


  If any of this makes any sense to you, please let me know what I am
 missing.



  Best,
 Marius



Spark partitioning question

2015-04-28 Thread Marius Danciu
Hello all,

I have the following Spark (pseudo)code:

rdd = mapPartitionsWithIndex(...)
.mapPartitionsToPair(...)
.groupByKey()
.sortByKey(comparator)
.partitionBy(myPartitioner)
.mapPartitionsWithIndex(...)
.mapPartitionsToPair( *f* )

The input data has 2 input splits (yarn 2.6.0).
myPartitioner partitions all the records on partition 0, which is correct,
so the intuition is that f provided to the last transformation
(mapPartitionsToPair) would run sequentially inside a single yarn
container. However from yarn logs I do see that both yarn containers are
processing records from the same partition ... and *sometimes*  the over
all job fails (due to the code in f which expects a certain order of
records) and yarn container 1 receives the records as expected, whereas
yarn container 2 receives a subset of records ... for a reason I cannot
explain and f fails.

The overall behavior of this job is that sometimes it succeeds and
sometimes it fails ... apparently due to inconsistent propagation of sorted
records to yarn containers.


If any of this makes any sense to you, please let me know what I am missing.



Best,
Marius


Re: Shuffle question

2015-04-22 Thread Marius Danciu
Anyone ?


On Tue, Apr 21, 2015 at 3:38 PM Marius Danciu marius.dan...@gmail.com
wrote:

 Hello anyone,

 I have a question regarding the sort shuffle. Roughly I'm doing something
 like:

 rdd.mapPartitionsToPair(f1).groupByKey().mapPartitionsToPair(f2)

 The problem is that in f2 I don't see the keys being sorted. The keys are
 Java Comparable  not scala.math.Ordered or scala.math.Ordering (it would be
 weird for each key to implement Ordering as mentioned in the JIRA item
 https://issues.apache.org/jira/browse/SPARK-2045)

 Questions:
 1. Do I need to explicitly sortByKey ? (if I do this I can see the keys
 correctly sorted in f2) ... but I'm worried about the extra costs since
 Spark 1.3.0 is supposed to use the SORT shuffle manager by default, right ?
 2. Do I need each key to be an scala.math.Ordered ? ... is Java Comparable
 used at all ?

 ... btw I'm using Spark from Java ... don't ask me why :)



 Best,
 Marius