Re: Secondary Sort using Apache Spark 1.6

2017-03-29 Thread Pariksheet Barapatre
Many Thanks Yong.

Your solution rocks. If you could paste your answer on stack overflow then
I can mark it as correct answer.

Also, can you tell me how to achieve same using companion object.

Cheers
Pari

On 29 March 2017 at 21:37, Yong Zhang  wrote:

> The error message indeed is not very clear.
>
>
> What you did wrong is that the repartitionAndSortWithinPartitions not
> only requires PairRDD, but also OrderedRDD. Your case class as key is NOT
> Ordered.
>
>
> Either you extends it from Ordered, or provide a companion object to do
> the implicit Ordering.
>
>
> scala> spark.versionres1: String = 2.1.0
>
> scala> case class DeviceKey(serialNum: String, eventDate: String, EventTs: 
> Long) extends Ordered[DeviceKey] { |   import 
> scala.math.Ordered.orderingToOrdered |   def compare(that: DeviceKey): 
> Int =
>  |  (this.serialNum, this.eventDate, this.EventTs * -1) compare
>  |  (that.serialNum, that.eventDate, that.EventTs * -1)
>  | }defined class DeviceKey
>
> scala>
>
> scala> val t = 
> sc.parallelize(List(((DeviceKey("2","100",1),1)),(DeviceKey("2","100",3),1)), 
> 1)t: org.apache.spark.rdd.RDD[(DeviceKey, Int)] = ParallelCollectionRDD[0] at 
> parallelize at :26
>
> scala>
>
> scala> class DeviceKeyPartitioner(partitions: Int) extends 
> org.apache.spark.Partitioner {
>  | require(partitions >= 0, s"Number of partitions ($partitions) 
> cannot be negative.")
>  |
>  | override def numPartitions: Int = partitions
>  |
>  | override def getPartition(key: Any): Int = {
>  |   val k = key.asInstanceOf[DeviceKey]
>  |   k.serialNum.hashCode() % numPartitions
>  | }
>  | }defined class DeviceKeyPartitioner
>
> scala>
>
> scala> t.repartitionAndSortWithinPartitions(new DeviceKeyPartitioner(2))res0: 
> org.apache.spark.rdd.RDD[(DeviceKey, Int)] = ShuffledRDD[1] at 
> repartitionAndSortWithinPartitions at :30
>
>
> Yong
>
>
> --
> *From:* Pariksheet Barapatre 
> *Sent:* Wednesday, March 29, 2017 9:02 AM
> *To:* user
> *Subject:* Secondary Sort using Apache Spark 1.6
>
> Hi,
> 
>
> I am referring web link http://codingjunkie.net/spark-secondary-sort/ to
> implement secondary sort in my spark job.
>
> I have defined my key case class as
>
> case class DeviceKey(serialNum: String, eventDate: String, EventTs: Long) {
>   implicit def orderingBySerialNum[A <: DeviceKey] : Ordering[A] = {
>Ordering.by(fk => (fk.serialNum, fk.eventDate, fk.EventTs * -1))
> }
> }
>
> but when I try to apply function
> t.repartitionAndSortWithinPartitions(partitioner)
>
> #t is a RDD[(DeviceKey, Int)]
>
> I get error
> I am getting error as -
> value repartitionAndSortWithinPartitions is not a member of 
> org.apache.spark.rdd.RDD[(DeviceKey, Int)]
>
>
> Example code available at
> http://stackoverflow.com/questions/43038682/secondary-sort-using-apache-spark-1-6
>
> Could somebody help me to understand error.
>
> Many Thanks
>
> Pari
>
>
> --
> Cheers,
> Pari
>



-- 
Cheers,
Pari


Re: How best we can store streaming data on dashboards for real time user experience?

2017-03-29 Thread Noorul Islam Kamal Malmiyoda
I think better place would be a in memory cache for real time.

Regards,
Noorul

On Thu, Mar 30, 2017 at 10:31 AM, Gaurav1809  wrote:
> I am getting streaming data and want to show them onto dashboards in real
> time?
> May I know how best we can handle these streaming data? where to store? (DB
> or HDFS or ???)
> I want to give users a real time analytics experience.
>
> Please suggest possible ways. Thanks.
>
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/How-best-we-can-store-streaming-data-on-dashboards-for-real-time-user-experience-tp28548.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



How best we can store streaming data on dashboards for real time user experience?

2017-03-29 Thread Gaurav1809
I am getting streaming data and want to show them onto dashboards in real
time?
May I know how best we can handle these streaming data? where to store? (DB
or HDFS or ???)
I want to give users a real time analytics experience.

Please suggest possible ways. Thanks.




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-best-we-can-store-streaming-data-on-dashboards-for-real-time-user-experience-tp28548.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Why VectorUDT private?

2017-03-29 Thread Ryan
spark version 2.1.0, vector is from ml package. the Vector in mllib has a
public VectorUDT type

On Thu, Mar 30, 2017 at 10:57 AM, Ryan  wrote:

> I'm writing a transformer and the input column is vector type(which is the
> output column from other transformer). But as the VectorUDT is private, how
> could I check/transform schema for the vector column?
>


Why VectorUDT private?

2017-03-29 Thread Ryan
I'm writing a transformer and the input column is vector type(which is the
output column from other transformer). But as the VectorUDT is private, how
could I check/transform schema for the vector column?


Re: Need help for RDD/DF transformation.

2017-03-29 Thread Mungeol Heo
Hello Yong,

First of all, thank your attention.
Note that the values of elements, which have values at RDD/DF1, in the
same list will be always same.
Therefore, the "1" and "3", which from RDD/DF 1, will always have the
same value which is "a".

The goal here is assigning same value to elements of the list which
does not exist in RDD/DF 1.
So, all the elements in the same list can have same value.

Or, the final RDD/DF also can be like this,

[1, 2, 3], a
[4, 5], b

Thank you again.

- Mungeol


On Wed, Mar 29, 2017 at 9:03 PM, Yong Zhang  wrote:
> What is the desired result for
>
>
> RDD/DF 1
>
> 1, a
> 3, c
> 5, b
>
> RDD/DF 2
>
> [1, 2, 3]
> [4, 5]
>
>
> Yong
>
> 
> From: Mungeol Heo 
> Sent: Wednesday, March 29, 2017 5:37 AM
> To: user@spark.apache.org
> Subject: Need help for RDD/DF transformation.
>
> Hello,
>
> Suppose, I have two RDD or data frame like addressed below.
>
> RDD/DF 1
>
> 1, a
> 3, a
> 5, b
>
> RDD/DF 2
>
> [1, 2, 3]
> [4, 5]
>
> I need to create a new RDD/DF like below from RDD/DF 1 and 2.
>
> 1, a
> 2, a
> 3, a
> 4, b
> 5, b
>
> Is there an efficient way to do this?
> Any help will be great.
>
> Thank you.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark streaming + kafka error with json library

2017-03-29 Thread Tathagata Das
Try depending on "spark-streaming-kafka-0-10_2.11" (not the assembly)

On Wed, Mar 29, 2017 at 9:59 AM, Srikanth  wrote:

> Hello,
>
> I'm trying to use "org.json4s" % "json4s-native" library in a spark
> streaming + kafka direct app.
> When I use the latest version of the lib I get an error similar to this
> 
> The work around suggest there is to use version 3.2.10. As spark has a
> hard dependency on this version.
>
> I forced this version in SBT with
> dependencyOverrides += "org.json4s" %% "json4s-native" % "3.2.10"
>
> But now it seems to have some conflict with spark-streaming-kafka-0-10-
> assembly
>
> [error] (*:assembly) deduplicate: different file contents found in the
> following:
>
> [error] C:\Users\stati\.ivy2\cache\org.apache.spark\spark-
> streaming-kafka-0-10-assembly_2.11\jars\spark-streaming-
> kafka-0-10-assembly_2.11-2.1.0.jar:scala/util/parsing/combinator/
> ImplicitConversions$$anonfun$flatten2$1.class
>
> [error] C:\Users\stati\.ivy2\cache\org.scala-lang.modules\scala-
> parser-combinators_2.11\bundles\scala-parser-combinators_2.11-1.0.4.jar:
> scala/util/parsing/combinator/ImplicitConversions$$anonfun$
> flatten2$1.class
>
> DependencyTree didn't show spark-streaming-kafka-0-10-assembly pulling
> json4s-native.
> Any idea how to resolve this? I'm using spark version 2.1.0
>
> Thanks,
> Srikanth
>


Re: Spark SQL, dataframe join questions.

2017-03-29 Thread vaquar khan
HI ,

I found following two links are helpful sharing with you .

http://stackoverflow.com/questions/38353524/how-to-ensure-partitioning-induced-by-spark-dataframe-join

http://spark.apache.org/docs/latest/configuration.html


Regards,
Vaquar khan

On Wed, Mar 29, 2017 at 2:45 PM, Vidya Sujeet 
wrote:

> In repartition, every element in the partition is moved to a new
> partition..doing a full shuffle compared to shuffles done by reduceBy
> clauses. With this in mind, repartition would increase your query
> performance. ReduceBy key will also shuffle based on the aggregation.
>
> The best way to design is to check the query plan of your data frame join
> query and do RDD joins accordingly, if needed.
>
>
> On Wed, Mar 29, 2017 at 10:55 AM, Yong Zhang  wrote:
>
>> You don't need to repartition your data just for join purpose. But if the
>> either parties of join is already partitioned, Spark will use this
>> advantage as part of join optimization.
>>
>> Should you reduceByKey before the join really depend on your join logic.
>> ReduceByKey will shuffle, and following join COULD cause another shuffle.
>> So I am not sure if it is a smart way.
>>
>> Yong
>>
>> --
>> *From:* shyla deshpande 
>> *Sent:* Wednesday, March 29, 2017 12:33 PM
>> *To:* user
>> *Subject:* Re: Spark SQL, dataframe join questions.
>>
>>
>>
>> On Tue, Mar 28, 2017 at 2:57 PM, shyla deshpande <
>> deshpandesh...@gmail.com> wrote:
>>
>>> Following are my questions. Thank you.
>>>
>>> 1. When joining dataframes is it a good idea to repartition on the key 
>>> column that is used in the join or
>>> the optimizer is too smart so forget it.
>>>
>>> 2. In RDD join, wherever possible we do reduceByKey before the join to 
>>> avoid a big shuffle of data. Do we need
>>> to do anything similar with dataframe joins, or the optimizer is too smart 
>>> so forget it.
>>>
>>>
>>
>


-- 
Regards,
Vaquar Khan
+1 -224-436-0783

IT Architect / Lead Consultant
Greater Chicago


Re: KMean clustering resulting Skewed Issue

2017-03-29 Thread Asher Krim
As I said in my previous reply, I don't think k-means is the right tool to
start with. Try LDA with k (number of latent topics) set to 3 and go up to
say 20. The problem likely lies is the feature vectors, on which you
provided almost no information. Text is not taken from a continuous space,
so any bag-of-words approach to clustering will likely fail unless you
first convert the features to a smaller and denser space

Asher Krim
Senior Software Engineer

On Wed, Mar 29, 2017 at 5:49 PM, Reth RM  wrote:

> Hi Krim,
>
>   The dataset that I am experimenting with is gold-truth and it has 3
> types of docs, one with terms relevant to topic1(sports) other with topic2
> (technology) and thirdly, topic3 with biology, so k setting is 3 and
> features are distinct in each topic(total features close to 1230). I think
> the issue is with centroids convergence. I have been testing with different
> iteration counts and I was assuming that with higher iteration count, the
> centroids will converge at one point and will not shift after that, and the
> 'computeCost' will remain close to same. However, when I test with
> incremental iteration counts and obtain 'cost' at each iteration (or window
> of 5 iterations each) the cost keeps shifting invariably. Below table is
> iteration count vs cost.  I passed the different epsilon value thinking if
> that will lead to consistent convergence, but no luck.  Screenshot
> [1]
> with different iteration count, epsilon vs cost
>
>
> Any thoughts on what am I doing wrong here?
>
>
> *3* *1.841406859*
> *4* *1.750348983*
> *5* *1.514564993*
> 6 1.514564993
> 7 1.514564993
> 8 1.514564993
> 9 1.514564993
> 10 1.514564993
> 11 1.514564993
> 12 1.514564993
> *13* *1.750348983*
> *14* *1.750348983*
> *15* *1.514564993*
> 16 1.514564993
> 17 1.514564993
> 18 1.514564993
> *19* *1.514564993*
> *20* *1.750348983*
>
> [1]https://s04.justpaste.it/files/justpaste/d417/
> a15312908/screen_shot_2017-03-29_at_2_46_42_pm.png
>
>
>
>
> On Sun, Mar 26, 2017 at 4:46 AM, Asher Krim  wrote:
>
>> Hi,
>>
>> Do you mean that you'e running K-Means directly on tf-idf bag-of-word
>> vectors? I think your results are expected because of the general lack of
>> big overlap between one hot encoded vectors. The similarity between most
>> vectors is expected to be very close to zero. Those that do end up in the
>> same cluster likely have a lot of similar boilerplate text (assuming the
>> training data comes from crawled new articles, they likely have similar
>> menus and header/footer text)
>>
>> I would suggest you try some dimensionality reduction on the tf-idf
>> vectors first. You have many options to choose from (LSA, LDA,
>> document2vec, etc). Other than that, this isn't a Spark question.
>>
>> Asher Krim
>> Senior Software Engineer
>>
>> On Fri, Mar 24, 2017 at 9:37 PM, Reth RM  wrote:
>>
>>> Hi,
>>>
>>>   I am using spark k mean for clustering records that consist of news
>>> documents, vectors are created by applying tf-idf. Dataset that I am using
>>> for testing right now is the gold-truth classified http://qwone.com/~j
>>> ason/20Newsgroups/
>>>
>>> Issue is all the documents are getting assigned to same cluster and
>>> others just have the vector(doc) picked as cluster center(skewed
>>> clustering). What could be the possible reasons for the issue, any
>>> suggestions? Should I be retuning the epsilon?
>>>
>>>
>>>
>>>
>>>
>>
>>
>


Re: Spark SQL, dataframe join questions.

2017-03-29 Thread Vidya Sujeet
In repartition, every element in the partition is moved to a new
partition..doing a full shuffle compared to shuffles done by reduceBy
clauses. With this in mind, repartition would increase your query
performance. ReduceBy key will also shuffle based on the aggregation.

The best way to design is to check the query plan of your data frame join
query and do RDD joins accordingly, if needed.


On Wed, Mar 29, 2017 at 10:55 AM, Yong Zhang  wrote:

> You don't need to repartition your data just for join purpose. But if the
> either parties of join is already partitioned, Spark will use this
> advantage as part of join optimization.
>
> Should you reduceByKey before the join really depend on your join logic.
> ReduceByKey will shuffle, and following join COULD cause another shuffle.
> So I am not sure if it is a smart way.
>
> Yong
>
> --
> *From:* shyla deshpande 
> *Sent:* Wednesday, March 29, 2017 12:33 PM
> *To:* user
> *Subject:* Re: Spark SQL, dataframe join questions.
>
>
>
> On Tue, Mar 28, 2017 at 2:57 PM, shyla deshpande  > wrote:
>
>> Following are my questions. Thank you.
>>
>> 1. When joining dataframes is it a good idea to repartition on the key 
>> column that is used in the join or
>> the optimizer is too smart so forget it.
>>
>> 2. In RDD join, wherever possible we do reduceByKey before the join to avoid 
>> a big shuffle of data. Do we need
>> to do anything similar with dataframe joins, or the optimizer is too smart 
>> so forget it.
>>
>>
>


Re: Collaborative filtering steps in spark

2017-03-29 Thread chris snow
Thanks Nick, that helps my with my understanding of ALS.


On Wed, 29 Mar 2017 at 14:41, Nick Pentreath 
wrote:

> No, it does a random initialization. It does use a slightly different
> approach from pure normal random - it chooses non-negative draws which
> results in very slightly better results empirically.
>
> In practice I'm not sure if the average rating approach will make a big
> difference (it's been a long while since I read the paper!)
>
> Sean put the absolute value init stuff in originally if I recall so may
> have more context.
>
> Though in fact looking at the code now, I see the comment still says that,
> but I'm not convinced the code actually does it:
>
> /**
>  * Initializes factors randomly given the in-link blocks.
>  *
>  * @param inBlocks in-link blocks
>  * @param rank rank
>  * @return initialized factor blocks
>  */
> private def initialize[ID](
> inBlocks: RDD[(Int, InBlock[ID])],
> rank: Int,
> seed: Long): RDD[(Int, FactorBlock)] = {
>   // Choose a unit vector uniformly at random from the unit sphere, but from 
> the
>   // "first quadrant" where all elements are nonnegative. This can be done by 
> choosing
>   // elements distributed as Normal(0,1) and taking the absolute value, and 
> then normalizing.
>   // This appears to create factorizations that have a slightly better 
> reconstruction
>   // (<1%) compared picking elements uniformly at random in [0,1].
>   inBlocks.map { case (srcBlockId, inBlock) =>
> val random = new XORShiftRandom(byteswap64(seed ^ srcBlockId))
> val factors = Array.fill(inBlock.srcIds.length) {
>   val factor = Array.fill(rank)(random.nextGaussian().toFloat)
>   val nrm = blas.snrm2(rank, factor, 1)
>   blas.sscal(rank, 1.0f / nrm, factor, 1)
>   factor
> }
> (srcBlockId, factors)
>   }
> }
>
>
> factor is ~ N(0, 1) and then scaled by the L2 norm, but it looks to me the
> abs value is never taken before scaling which is what the comment
> indicates...
>
>
> On Mon, 27 Mar 2017 at 00:55 chris snow  wrote:
>
> In the paper “Large-Scale Parallel Collaborative Filtering for the
> Netflix Prize”, the following steps are described for ALS:
>
> Step 1 Initialize matrix M by assigning the average rating for that
> movie as the first row, and
> small random numbers for the remaining entries.
> Step 2 Fix M, Solve U by minimizing the objective function (the sum of
> squared errors);
> Step 3 Fix U, solve M by minimizing the objective function similarly;
> Step 4 Repeat Steps 2 and 3 until a stopping criterion is satisfied.
>
> Does spark take the average rating for the movie as the first row?
> I've looked through the source code, but I can't see the average
> rating being calculated for the movie.
>
> Many thanks,
>
> Chris
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Alternatives for dataframe collectAsList()

2017-03-29 Thread szep.laszlo.it
Hi,

after I created a dataset

Dataset df = sqlContext.sql("query");

I need to have a result values and I call a method: collectAsList()

List list = df.collectAsList();

But it's very slow, if I work with large datasets (20-30 million records). I
know, that the result isn't presented in driver app, that's why it takes
long time, because collectAsList() collect all data from worker nodes.

But then what is the right way to get result values? Is there an other
solution to iterate over a result dataset rows, or get values? Can anyone
post a small & working example?

Thanks & Regards,
Laszlo Szep



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Alternatives-for-dataframe-collectAsList-tp28547.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Returning DataFrame for text file

2017-03-29 Thread George Obama
Hi,

I saw that the API, either R or Scala, we are returning DataFrame for 
sparkSession.read.text()

What’s the rational behind this? 

Regards,
George

Re: Spark SQL, dataframe join questions.

2017-03-29 Thread Yong Zhang
You don't need to repartition your data just for join purpose. But if the 
either parties of join is already partitioned, Spark will use this advantage as 
part of join optimization.

Should you reduceByKey before the join really depend on your join logic. 
ReduceByKey will shuffle, and following join COULD cause another shuffle. So I 
am not sure if it is a smart way.

Yong


From: shyla deshpande 
Sent: Wednesday, March 29, 2017 12:33 PM
To: user
Subject: Re: Spark SQL, dataframe join questions.



On Tue, Mar 28, 2017 at 2:57 PM, shyla deshpande 
> wrote:

Following are my questions. Thank you.

1. When joining dataframes is it a good idea to repartition on the key column 
that is used in the join or
the optimizer is too smart so forget it.

2. In RDD join, wherever possible we do reduceByKey before the join to avoid a 
big shuffle of data. Do we need
to do anything similar with dataframe joins, or the optimizer is too smart so 
forget it.



Spark streaming + kafka error with json library

2017-03-29 Thread Srikanth
Hello,

I'm trying to use "org.json4s" % "json4s-native" library in a spark
streaming + kafka direct app.
When I use the latest version of the lib I get an error similar to this

The work around suggest there is to use version 3.2.10. As spark has a hard
dependency on this version.

I forced this version in SBT with
dependencyOverrides += "org.json4s" %% "json4s-native" % "3.2.10"

But now it seems to have some conflict with
spark-streaming-kafka-0-10-assembly

[error] (*:assembly) deduplicate: different file contents found in the
following:

[error]
C:\Users\stati\.ivy2\cache\org.apache.spark\spark-streaming-kafka-0-10-assembly_2.11\jars\spark-streaming-kafka-0-10-assembly_2.11-2.1.0.jar:scala/util/parsing/combinator/ImplicitConversions$$anonfun$flatten2$1.class

[error]
C:\Users\stati\.ivy2\cache\org.scala-lang.modules\scala-parser-combinators_2.11\bundles\scala-parser-combinators_2.11-1.0.4.jar:scala/util/parsing/combinator/ImplicitConversions$$anonfun$flatten2$1.class

DependencyTree didn't show spark-streaming-kafka-0-10-assembly pulling
json4s-native.
Any idea how to resolve this? I'm using spark version 2.1.0

Thanks,
Srikanth


Re: Spark SQL, dataframe join questions.

2017-03-29 Thread shyla deshpande
On Tue, Mar 28, 2017 at 2:57 PM, shyla deshpande 
wrote:

> Following are my questions. Thank you.
>
> 1. When joining dataframes is it a good idea to repartition on the key column 
> that is used in the join or
> the optimizer is too smart so forget it.
>
> 2. In RDD join, wherever possible we do reduceByKey before the join to avoid 
> a big shuffle of data. Do we need
> to do anything similar with dataframe joins, or the optimizer is too smart so 
> forget it.
>
>


Re: Secondary Sort using Apache Spark 1.6

2017-03-29 Thread Yong Zhang
The error message indeed is not very clear.


What you did wrong is that the repartitionAndSortWithinPartitions not only 
requires PairRDD, but also OrderedRDD. Your case class as key is NOT Ordered.


Either you extends it from Ordered, or provide a companion object to do the 
implicit Ordering.


scala> spark.version
res1: String = 2.1.0

scala> case class DeviceKey(serialNum: String, eventDate: String, EventTs: Long)
extends Ordered[DeviceKey] {
 |   import scala.math.Ordered.orderingToOrdered
 |   def compare(that: DeviceKey): Int =
 |  (this.serialNum, this.eventDate, this.EventTs * -1) compare
 |  (that.serialNum, that.eventDate, that.EventTs * -1)
 | }
defined class DeviceKey

scala>

scala> val t = sc.parallelize(List(((DeviceKey("2","100",1),1)),
(DeviceKey("2","100",3),1)), 1)
t: org.apache.spark.rdd.RDD[(DeviceKey, Int)] = ParallelCollectionRDD[0] at 
parallelize at :26

scala>

scala> class DeviceKeyPartitioner(partitions: Int) extends 
org.apache.spark.Partitioner {
 | require(partitions >= 0, s"Number of partitions ($partitions) cannot 
be negative.")
 |
 | override def numPartitions: Int = partitions
 |
 | override def getPartition(key: Any): Int = {
 |   val k = key.asInstanceOf[DeviceKey]
 |   k.serialNum.hashCode() % numPartitions
 | }
 | }
defined class DeviceKeyPartitioner

scala>

scala> t.repartitionAndSortWithinPartitions(new DeviceKeyPartitioner(2))
res0: org.apache.spark.rdd.RDD[(DeviceKey, Int)] = ShuffledRDD[1] at 
repartitionAndSortWithinPartitions at :30


Yong



From: Pariksheet Barapatre 
Sent: Wednesday, March 29, 2017 9:02 AM
To: user
Subject: Secondary Sort using Apache Spark 1.6

Hi,

I am referring web link http://codingjunkie.net/spark-secondary-sort/ to 
implement secondary sort in my spark job.

I have defined my key case class as

case class DeviceKey(serialNum: String, eventDate: String, EventTs: Long) {
  implicit def orderingBySerialNum[A <: DeviceKey] : Ordering[A] = {
   Ordering.by(fk => (fk.serialNum, fk.eventDate, fk.EventTs * -1))
}
}


but when I try to apply function
t.repartitionAndSortWithinPartitions(partitioner)


#t is a RDD[(DeviceKey, Int)]


I get error
I am getting error as -
value repartitionAndSortWithinPartitions is not a member of 
org.apache.spark.rdd.RDD[(DeviceKey, Int)]



Example code available at
http://stackoverflow.com/questions/43038682/secondary-sort-using-apache-spark-1-6


Could somebody help me to understand error.


Many Thanks

Pari

--
Cheers,
Pari


httpclient conflict in spark

2017-03-29 Thread Arvind Kandaswamy
Hello,

I am getting the following error. I get this error when trying to use AWS
S3. This appears to be a conflict with httpclient. AWS S3 comes with
httplient-4.5.2.jar. I am not sure how to force spark to use this version.
I have tried spark.driver.userClassPathFirst = true,
spark.executor.userClassPathFirst=true. Did not help. I am using Zeppelin
to call the spark engine in case if that is an issue.

Is there anything else that I can try?

java.lang.NoSuchMethodError:
org.apache.http.conn.ssl.SSLConnectionSocketFactory.(Ljavax/net/ssl/SSLContext;Ljavax/net/ssl/HostnameVerifier;)V
at
com.amazonaws.http.conn.ssl.SdkTLSSocketFactory.(SdkTLSSocketFactory.java:56)
at
com.amazonaws.http.apache.client.impl.ApacheConnectionManagerFactory.getPreferredSocketFactory(ApacheConnectionManagerFactory.java:92)
at
com.amazonaws.http.apache.client.impl.ApacheConnectionManagerFactory.create(ApacheConnectionManagerFactory.java:65)
at
com.amazonaws.http.apache.client.impl.ApacheConnectionManagerFactory.create(ApacheConnectionManagerFactory.java:58)
at
com.amazonaws.http.apache.client.impl.ApacheHttpClientFactory.create(ApacheHttpClientFactory.java:51)
at
com.amazonaws.http.apache.client.impl.ApacheHttpClientFactory.create(ApacheHttpClientFactory.java:39)
at com.amazonaws.http.AmazonHttpClient.(AmazonHttpClient.java:314)
at com.amazonaws.http.AmazonHttpClient.(AmazonHttpClient.java:298)
at
com.amazonaws.AmazonWebServiceClient.(AmazonWebServiceClient.java:165)
at com.amazonaws.services.s3.AmazonS3Client.(AmazonS3Client.java:583)
at com.amazonaws.services.s3.AmazonS3Client.(AmazonS3Client.java:563)
at com.amazonaws.services.s3.AmazonS3Client.(AmazonS3Client.java:541)
at org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:235)


Re: Collaborative filtering steps in spark

2017-03-29 Thread Nick Pentreath
No, it does a random initialization. It does use a slightly different
approach from pure normal random - it chooses non-negative draws which
results in very slightly better results empirically.

In practice I'm not sure if the average rating approach will make a big
difference (it's been a long while since I read the paper!)

Sean put the absolute value init stuff in originally if I recall so may
have more context.

Though in fact looking at the code now, I see the comment still says that,
but I'm not convinced the code actually does it:

/**
 * Initializes factors randomly given the in-link blocks.
 *
 * @param inBlocks in-link blocks
 * @param rank rank
 * @return initialized factor blocks
 */
private def initialize[ID](
inBlocks: RDD[(Int, InBlock[ID])],
rank: Int,
seed: Long): RDD[(Int, FactorBlock)] = {
  // Choose a unit vector uniformly at random from the unit sphere, but from the
  // "first quadrant" where all elements are nonnegative. This can be
done by choosing
  // elements distributed as Normal(0,1) and taking the absolute
value, and then normalizing.
  // This appears to create factorizations that have a slightly better
reconstruction
  // (<1%) compared picking elements uniformly at random in [0,1].
  inBlocks.map { case (srcBlockId, inBlock) =>
val random = new XORShiftRandom(byteswap64(seed ^ srcBlockId))
val factors = Array.fill(inBlock.srcIds.length) {
  val factor = Array.fill(rank)(random.nextGaussian().toFloat)
  val nrm = blas.snrm2(rank, factor, 1)
  blas.sscal(rank, 1.0f / nrm, factor, 1)
  factor
}
(srcBlockId, factors)
  }
}


factor is ~ N(0, 1) and then scaled by the L2 norm, but it looks to me the
abs value is never taken before scaling which is what the comment
indicates...


On Mon, 27 Mar 2017 at 00:55 chris snow  wrote:

> In the paper “Large-Scale Parallel Collaborative Filtering for the
> Netflix Prize”, the following steps are described for ALS:
>
> Step 1 Initialize matrix M by assigning the average rating for that
> movie as the first row, and
> small random numbers for the remaining entries.
> Step 2 Fix M, Solve U by minimizing the objective function (the sum of
> squared errors);
> Step 3 Fix U, solve M by minimizing the objective function similarly;
> Step 4 Repeat Steps 2 and 3 until a stopping criterion is satisfied.
>
> Does spark take the average rating for the movie as the first row?
> I've looked through the source code, but I can't see the average
> rating being calculated for the movie.
>
> Many thanks,
>
> Chris
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Issues with partitionBy method on data frame writer SPARK 2.0.2

2017-03-29 Thread Luke Swift
Hello I am trying to write parquet files from a data frame. I am able to
use the partitionBy("year", "month", "day") and spark correctly physically
partitions the data in a directory structure i expect.

The issue is when the partitions themselves are anything non trivial in
size then the memory usage seems to blow up and i am getting a lot of gc
pressure on my cluster. There is lots of red in the executors tab on the
web UI for the all the executers in the GC time column. If i try to
coalesce the data frames rdd so get reasonably sized output files the job
falls over due to GC pressure.

Removing the partitionBy and writing directly to the output destination
alleviates the problem, however we would like this functionality to improve
out query performance in engines like hive.

I am running spark 2.0.2 on EMR 5.3.1, i am using pretty large nodes
c3.4xlarge which have 30g ram per node and each executor gets 5.5g. I saw
some previous mails about a similar issue but that was back in spark 1.4
days and they seem to have been resolved but i still have this issue.

Any help would be appreciated.


Secondary Sort using Apache Spark 1.6

2017-03-29 Thread Pariksheet Barapatre
 Hi,


I am referring web link http://codingjunkie.net/spark-secondary-sort/ to
implement secondary sort in my spark job.

I have defined my key case class as

case class DeviceKey(serialNum: String, eventDate: String, EventTs: Long) {
  implicit def orderingBySerialNum[A <: DeviceKey] : Ordering[A] = {
   Ordering.by(fk => (fk.serialNum, fk.eventDate, fk.EventTs * -1))
}
}

but when I try to apply function
t.repartitionAndSortWithinPartitions(partitioner)

#t is a RDD[(DeviceKey, Int)]

I get error
I am getting error as -
value repartitionAndSortWithinPartitions is not a member of
org.apache.spark.rdd.RDD[(DeviceKey, Int)]


Example code available at
http://stackoverflow.com/questions/43038682/secondary-sort-using-apache-spark-1-6

Could somebody help me to understand error.

Many Thanks

Pari


-- 
Cheers,
Pari


Re: Need help for RDD/DF transformation.

2017-03-29 Thread Yong Zhang
What is the desired result for


RDD/DF 1

1, a
3, c
5, b

RDD/DF 2

[1, 2, 3]
[4, 5]

Yong


From: Mungeol Heo 
Sent: Wednesday, March 29, 2017 5:37 AM
To: user@spark.apache.org
Subject: Need help for RDD/DF transformation.

Hello,

Suppose, I have two RDD or data frame like addressed below.

RDD/DF 1

1, a
3, a
5, b

RDD/DF 2

[1, 2, 3]
[4, 5]

I need to create a new RDD/DF like below from RDD/DF 1 and 2.

1, a
2, a
3, a
4, b
5, b

Is there an efficient way to do this?
Any help will be great.

Thank you.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Need help for RDD/DF transformation.

2017-03-29 Thread Mungeol Heo
Hello,

Suppose, I have two RDD or data frame like addressed below.

RDD/DF 1

1, a
3, a
5, b

RDD/DF 2

[1, 2, 3]
[4, 5]

I need to create a new RDD/DF like below from RDD/DF 1 and 2.

1, a
2, a
3, a
4, b
5, b

Is there an efficient way to do this?
Any help will be great.

Thank you.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Upgrade the scala code using the most updated Spark version

2017-03-29 Thread Anahita Talebi
Hi,

Thanks everybody to help me to solve my problem :)
As Zhu said, I had to use mapPartitionsWithIndex in my code.

Thanks,
Have a nice day,
Anahita

On Wed, Mar 29, 2017 at 2:51 AM, Shixiong(Ryan) Zhu  wrote:

> mapPartitionsWithSplit was removed in Spark 2.0.0. You can
> use mapPartitionsWithIndex instead.
>
> On Tue, Mar 28, 2017 at 3:52 PM, Anahita Talebi  > wrote:
>
>> Thanks.
>> I tried this one, as well. Unfortunately I still get the same error.
>>
>>
>> On Wednesday, March 29, 2017, Marco Mistroni  wrote:
>>
>>> 1.7.5
>>>
>>> On 28 Mar 2017 10:10 pm, "Anahita Talebi" 
>>> wrote:
>>>
 Hi,

 Thanks for your answer.
 What is the version of "org.slf4j" % "slf4j-api" in your sbt file?
 I think the problem might come from this part.

 On Tue, Mar 28, 2017 at 11:02 PM, Marco Mistroni 
 wrote:

> Hello
>  uhm ihave a project whose build,sbt is closest to yours, where i am
> using spark 2.1, scala 2.11 and scalatest (i upgraded to 3.0.0) and it
> works fine
> in my projects though i don thave any of the following libraries that
> you mention
> - breeze
> - netlib,all
> -  scoopt
>
> hth
>
> On Tue, Mar 28, 2017 at 9:10 PM, Anahita Talebi <
> anahita.t.am...@gmail.com> wrote:
>
>> Hi,
>>
>> Thanks for your answer.
>>
>> I first changed the scala version to 2.11.8 and kept the spark
>> version 1.5.2 (old version). Then I changed the scalatest version into
>> "3.0.1". With this configuration, I could run the code and compile it and
>> generate the .jar file.
>>
>> When I changed the spark version into 2.1.0, I get the same error as
>> before. So I imagine the problem should be somehow related to the version
>> of spark.
>>
>> Cheers,
>> Anahita
>>
>> 
>> 
>> 
>> import AssemblyKeys._
>>
>> assemblySettings
>>
>> name := "proxcocoa"
>>
>> version := "0.1"
>>
>> organization := "edu.berkeley.cs.amplab"
>>
>> scalaVersion := "2.11.8"
>>
>> parallelExecution in Test := false
>>
>> {
>>   val excludeHadoop = ExclusionRule(organization =
>> "org.apache.hadoop")
>>   libraryDependencies ++= Seq(
>> "org.slf4j" % "slf4j-api" % "1.7.2",
>> "org.slf4j" % "slf4j-log4j12" % "1.7.2",
>> "org.scalatest" %% "scalatest" % "3.0.1" % "test",
>> "org.apache.spark" %% "spark-core" % "2.1.0"
>> excludeAll(excludeHadoop),
>> "org.apache.spark" %% "spark-mllib" % "2.1.0"
>> excludeAll(excludeHadoop),
>> "org.apache.spark" %% "spark-sql" % "2.1.0"
>> excludeAll(excludeHadoop),
>> "org.apache.commons" % "commons-compress" % "1.7",
>> "commons-io" % "commons-io" % "2.4",
>> "org.scalanlp" % "breeze_2.11" % "0.11.2",
>> "com.github.fommil.netlib" % "all" % "1.1.2" pomOnly(),
>> "com.github.scopt" %% "scopt" % "3.3.0"
>>   )
>> }
>>
>> {
>>   val defaultHadoopVersion = "1.0.4"
>>   val hadoopVersion =
>> scala.util.Properties.envOrElse("SPARK_HADOOP_VERSION",
>> defaultHadoopVersion)
>>   libraryDependencies += "org.apache.hadoop" % "hadoop-client" %
>> hadoopVersion
>> }
>>
>> libraryDependencies += "org.apache.spark" %% "spark-streaming" %
>> "2.1.0"
>>
>> resolvers ++= Seq(
>>   "Local Maven Repository" at Path.userHome.asFile.toURI.toURL +
>> ".m2/repository",
>>   "Typesafe" at "http://repo.typesafe.com/typesafe/releases;,
>>   "Spray" at "http://repo.spray.cc;
>> )
>>
>> mergeStrategy in assembly <<= (mergeStrategy in assembly) { (old) =>
>>   {
>> case PathList("javax", "servlet", xs @ _*)   =>
>> MergeStrategy.first
>> case PathList(ps @ _*) if ps.last endsWith ".html"   =>
>> MergeStrategy.first
>> case "application.conf"  =>
>> MergeStrategy.concat
>> case "reference.conf"=>
>> MergeStrategy.concat
>> case "log4j.properties"  =>
>> MergeStrategy.discard
>> case m if m.toLowerCase.endsWith("manifest.mf")  =>
>> MergeStrategy.discard
>> case m if m.toLowerCase.matches("meta-inf.*\\.sf$")  =>
>> MergeStrategy.discard
>> case _ => MergeStrategy.first
>>   }
>> }
>>
>> test in assembly := {}
>> 
>> 
>> 
>>
>> On Tue, Mar 28, 2017 at 9:33 PM, Marco 

Re: dataframe join questions. Appreciate your input.

2017-03-29 Thread shyla deshpande
On Tue, Mar 28, 2017 at 2:57 PM, shyla deshpande 
wrote:

> Following are my questions. Thank you.
>
> 1. When joining dataframes is it a good idea to repartition on the key column 
> that is used in the join or
> the optimizer is too smart so forget it.
>
> 2. In RDD join, wherever possible we do reduceByKey before the join to avoid 
> a big shuffle of data. Do we need
> to do anything similar with dataframe joins, or the optimizer is too smart so 
> forget it.
>
>