Re: Saving data frames on Spark Master/Driver

2016-07-14 Thread Sun Rui
You can simply save the join result distributedly, for example, as a HDFS file, 
and then copy the HDFS file to a local file.

There is an alternative memory-efficient way to collect distributed data back 
to driver other than collect(), that is toLocalIterator. The iterator will 
consume as much memory as the largest partition in your dataset.

You can use DataFrame.rdd.toLocalIterator() with Spark versions prior to 2.0. 
You can use Dataset.toLocalIterator() with Spark 2.0. 

For details, refer to https://issues.apache.org/jira/browse/SPARK-14334 


> On Jul 15, 2016, at 09:05, Pedro Rodriguez  wrote:
> 
> Out of curiosity, is there a way to pull all the data back to the driver to 
> save without collect()? That is, stream the data in chunks back to the driver 
> so that maximum memory used comparable to a single node’s data, but all the 
> data is saved on one node.
> 
> —
> Pedro Rodriguez
> PhD Student in Large-Scale Machine Learning | CU Boulder
> Systems Oriented Data Scientist
> UC Berkeley AMPLab Alumni
> 
> pedrorodriguez.io  | 909-353-4423
> github.com/EntilZha  | LinkedIn 
> 
> On July 14, 2016 at 6:02:12 PM, Jacek Laskowski (ja...@japila.pl 
> ) wrote:
> 
>> Hi, 
>> 
>> Please re-consider your wish since it is going to move all the 
>> distributed dataset to the single machine of the driver and may lead 
>> to OOME. It's more pro to save your result to HDFS or S3 or any other 
>> distributed filesystem (that is accessible by the driver and 
>> executors). 
>> 
>> If you insist... 
>> 
>> Use collect() after select() and work with Array[T]. 
>> 
>> Pozdrawiam, 
>> Jacek Laskowski 
>>  
>> https://medium.com/@jaceklaskowski/ 
>> Mastering Apache Spark http://bit.ly/mastering-apache-spark 
>> Follow me at https://twitter.com/jaceklaskowski 
>> 
>> 
>> On Fri, Jul 15, 2016 at 12:15 AM, vr.n. nachiappan 
>>  wrote: 
>> > Hello, 
>> > 
>> > I am using data frames to join two cassandra tables. 
>> > 
>> > Currently when i invoke save on data frames as shown below it is saving 
>> > the 
>> > join results on executor nodes. 
>> > 
>> > joineddataframe.select(,  
>> > ...).format("com.databricks.spark.csv").option("header", 
>> > "true").save() 
>> > 
>> > I would like to persist the results of the join on Spark Master/Driver 
>> > node. 
>> > Is it possible to save the results on Spark Master/Driver and how to do 
>> > it. 
>> > 
>> > I appreciate your help. 
>> > 
>> > Nachi 
>> > 
>> 
>> - 
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Saving data frames on Spark Master/Driver

2016-07-14 Thread Taotao.Li
hi, consider transfer dataframe to rdd and then use* rdd.toLocalIterator *to
collect data on the driver node.

On Fri, Jul 15, 2016 at 9:05 AM, Pedro Rodriguez 
wrote:

> Out of curiosity, is there a way to pull all the data back to the driver
> to save without collect()? That is, stream the data in chunks back to the
> driver so that maximum memory used comparable to a single node’s data, but
> all the data is saved on one node.
>
> —
> Pedro Rodriguez
> PhD Student in Large-Scale Machine Learning | CU Boulder
> Systems Oriented Data Scientist
> UC Berkeley AMPLab Alumni
>
> pedrorodriguez.io | 909-353-4423
> github.com/EntilZha | LinkedIn
> 
>
> On July 14, 2016 at 6:02:12 PM, Jacek Laskowski (ja...@japila.pl) wrote:
>
> Hi,
>
> Please re-consider your wish since it is going to move all the
> distributed dataset to the single machine of the driver and may lead
> to OOME. It's more pro to save your result to HDFS or S3 or any other
> distributed filesystem (that is accessible by the driver and
> executors).
>
> If you insist...
>
> Use collect() after select() and work with Array[T].
>
> Pozdrawiam,
> Jacek Laskowski
> 
> https://medium.com/@jaceklaskowski/
> Mastering Apache Spark http://bit.ly/mastering-apache-spark
> Follow me at https://twitter.com/jaceklaskowski
>
>
> On Fri, Jul 15, 2016 at 12:15 AM, vr.n. nachiappan
>  wrote:
> > Hello,
> >
> > I am using data frames to join two cassandra tables.
> >
> > Currently when i invoke save on data frames as shown below it is saving
> the
> > join results on executor nodes.
> >
> > joineddataframe.select(, 
> > ...).format("com.databricks.spark.csv").option("header",
> > "true").save()
> >
> > I would like to persist the results of the join on Spark Master/Driver
> node.
> > Is it possible to save the results on Spark Master/Driver and how to do
> it.
> >
> > I appreciate your help.
> >
> > Nachi
> >
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


-- 
*___*
Quant | Engineer | Boy
*___*
*blog*:http://litaotao.github.io

*github*: www.github.com/litaotao


How to recommend most similar users using Spark ML

2016-07-14 Thread jeremycod
Hi,

I need to develop a service that will recommend user with other similar
users that he can connect to. For each user I have a data about user
preferences for specific items in the form:

user, item, preference  
1,75,   0.89  
2,168,  0.478  
2,99,   0.321  
3,31,   0.012

So far, I implemented approach using cosine similarity that compare one user
features vector with other users:

def cosineSimilarity(vec1: DoubleMatrix, vec2: DoubleMatrix): Double=
{
vec1.dot(vec2)/(vec1.norm2()*vec2.norm2())
}
def user2usersimilarity(userid:Integer, recNumber:Integer): Unit ={
val userFactor=model.userFeatures.lookup(userid).head
val userVector=new DoubleMatrix(userFactor)
val s1=cosineSimilarity(userVector,userVector)
val sims=model.userFeatures.map{case(id,factor)=>
val factorVector=new DoubleMatrix(factor)
val sim=cosineSimilarity(factorVector, userVector)
(id,sim)
}
val sortedSims=sims.top(recNumber+1)(Ordering.by[(Int, Double),Double]
{case(id, similarity)=>similarity})
println(sortedSims.slice(1,recNumber+1).mkString("\n"))
 }

This approach works fine with the MovieLens dataset in terms of quality of
recommendations. However, my concern is related to performance of such
algorithm. Since I have to generate recommendations for all users in the
system, with this approach I would compare each user with all other users in
the system.

I would appreciate if somebody could suggest how to limit comparison of the
user to top N neighbors, or some other algorithm that would work better in
my use case.

Thanks,
Zoran




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-recommend-most-similar-users-using-Spark-ML-tp27342.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Severe Spark Streaming performance degradation after upgrading to 1.6.1

2016-07-14 Thread Sunita Arvind
Thank you for your inputs. Will test it out and share my findings



On Thursday, July 14, 2016, CosminC  wrote:

> Didn't have the time to investigate much further, but the one thing that
> popped out is that partitioning was no longer working on 1.6.1. This would
> definitely explain the 2x performance loss.
>
> Checking 1.5.1 Spark logs for the same application showed that our
> partitioner was working correctly, and after the DStream / RDD creation a
> user session was only processed on a single machine. Running on top of
> 1.6.1
> though, the session was processed on up to 4 machines, in a 5 node cluster
> including the driver, with a lot of redundant operations. We use a custom
> but very simple partitioner which extends HashPartitioner. It partitions on
> a case class which has a single string parameter.
>
> Speculative operations are turned off by default, and we never enabled it,
> so it's not that.
>
> Right now we're postponing any Spark upgrade, and we'll probably try to
> upgrade directly to Spark 2.0, hoping the partitioning issue is no longer
> present there.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Severe-Spark-Streaming-performance-degradation-after-upgrading-to-1-6-1-tp27056p27334.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org 
>
>


Re: Saving data frames on Spark Master/Driver

2016-07-14 Thread Pedro Rodriguez
Out of curiosity, is there a way to pull all the data back to the driver to 
save without collect()? That is, stream the data in chunks back to the driver 
so that maximum memory used comparable to a single node’s data, but all the 
data is saved on one node.

—
Pedro Rodriguez
PhD Student in Large-Scale Machine Learning | CU Boulder
Systems Oriented Data Scientist
UC Berkeley AMPLab Alumni

pedrorodriguez.io | 909-353-4423
github.com/EntilZha | LinkedIn

On July 14, 2016 at 6:02:12 PM, Jacek Laskowski (ja...@japila.pl) wrote:

Hi,  

Please re-consider your wish since it is going to move all the  
distributed dataset to the single machine of the driver and may lead  
to OOME. It's more pro to save your result to HDFS or S3 or any other  
distributed filesystem (that is accessible by the driver and  
executors).  

If you insist...  

Use collect() after select() and work with Array[T].  

Pozdrawiam,  
Jacek Laskowski  
  
https://medium.com/@jaceklaskowski/  
Mastering Apache Spark http://bit.ly/mastering-apache-spark  
Follow me at https://twitter.com/jaceklaskowski  


On Fri, Jul 15, 2016 at 12:15 AM, vr.n. nachiappan  
 wrote:  
> Hello,  
>  
> I am using data frames to join two cassandra tables.  
>  
> Currently when i invoke save on data frames as shown below it is saving the  
> join results on executor nodes.  
>  
> joineddataframe.select(,   
> ...).format("com.databricks.spark.csv").option("header",  
> "true").save()  
>  
> I would like to persist the results of the join on Spark Master/Driver node.  
> Is it possible to save the results on Spark Master/Driver and how to do it.  
>  
> I appreciate your help.  
>  
> Nachi  
>  

-  
To unsubscribe e-mail: user-unsubscr...@spark.apache.org  



How to check if a data frame is cached?

2016-07-14 Thread Cesar
Is there a simpler way to check if a data frame is cached other than:

dataframe.registerTempTable("cachedOutput")
assert(hc.isCached("cachedOutput"), "The table was not cached")


Thanks!
-- 
Cesar Flores


Re: Maximum Size of Reference Look Up Table in Spark

2016-07-14 Thread Jacek Laskowski
Hi,

My understanding is that the maximum size of a broadcast is the
Long.MAX_VALUE (and plus some more since the data is going to be
encoded to save space, esp. for catalyst-driver datasets).

Ad 2. Before the tasks access the broadcast variable it has to be sent
across network that may be too slow to be acceptable.


Pozdrawiam,
Jacek Laskowski

https://medium.com/@jaceklaskowski/
Mastering Apache Spark http://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski


On Thu, Jul 14, 2016 at 11:32 PM, Saravanan Subramanian
 wrote:
> Hello All,
>
> I am in the middle of designing real time data enhancement services using
> spark streaming.  As part of this, I have to look up some reference data
> while processing the incoming stream.
>
> I have below questions:
>
> 1) what is the maximum size of look up table / variable can be stored as
> Broadcast variable ()
> 2) What is the impact of cluster performance, if I store a 10GB data in
> broadcast variable
>
> Any suggestions and thoughts are welcome.
>
> Thanks,
> Saravanan S.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Saving data frames on Spark Master/Driver

2016-07-14 Thread Jacek Laskowski
Hi,

Please re-consider your wish since it is going to move all the
distributed dataset to the single machine of the driver and may lead
to OOME. It's more pro to save your result to HDFS or S3 or any other
distributed filesystem (that is accessible by the driver and
executors).

If you insist...

Use collect() after select() and work with Array[T].

Pozdrawiam,
Jacek Laskowski

https://medium.com/@jaceklaskowski/
Mastering Apache Spark http://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski


On Fri, Jul 15, 2016 at 12:15 AM, vr.n. nachiappan
 wrote:
> Hello,
>
> I am using data frames to join two cassandra tables.
>
> Currently when i invoke save on data frames as shown below it is saving the
> join results on executor nodes.
>
> joineddataframe.select(, 
> ...).format("com.databricks.spark.csv").option("header",
> "true").save()
>
> I would like to persist the results of the join on Spark Master/Driver node.
> Is it possible to save the results on Spark Master/Driver and how to do it.
>
> I appreciate your help.
>
> Nachi
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Saving data frames on Spark Master/Driver

2016-07-14 Thread vr.n. nachiappan
Hello,
I am using data frames to join two cassandra tables.
Currently when i invoke save on data frames as shown below it is saving the 
join results on executor nodes. 
joineddataframe.select(,  
...).format("com.databricks.spark.csv").option("header", "true").save()
I would like to persist the results of the join on Spark Master/Driver node. Is 
it possible to save the results on Spark Master/Driver and how to do it.
I appreciate your help.
Nachi


SparkStreaming multiple output operations failure semantics / error propagation

2016-07-14 Thread Martin Eden
Hi,

I have a Spark 1.6.2 streaming job with multiple output operations (jobs)
doing idempotent changes in different repositories.

The problem is that I want to somehow pass errors from one output operation
to another such that  in the current output operation I only update
previously successful messages. This has to propagate all the way to the
last job which is supposed to only ACK the successfully processed messages
to the input queue, leaving the unsuccessful ones un-ACKED for later
processing.

The overall desired behaviour is best effort / fail fast, leaving the
messages which were not successfully processed by all output operations in
the input queue for retrying later.

Is there a pattern for achieving this in SparkStreaming?

If not can SparkStreaming at least guarantee that if the previous
job/output operation in the batch fails, it does not execute the next
jobs/output operations?

Thanks in advance,
M


Maximum Size of Reference Look Up Table in Spark

2016-07-14 Thread Saravanan Subramanian
Hello All,
I am in the middle of designing real time data enhancement services using spark 
streaming.  As part of this, I have to look up some reference data while 
processing the incoming stream.
I have below questions:
1) what is the maximum size of look up table / variable can be stored as 
Broadcast variable ()2) What is the impact of cluster performance, if I store a 
10GB data in broadcast variable
Any suggestions and thoughts are welcome.
Thanks,Saravanan S.

Filtering RDD Using Spark.mllib's ChiSqSelector

2016-07-14 Thread Tobi Bosede
Hi everyone,

I am trying to filter my features based on the spark.mllib ChiSqSelector.

filteredData = vectorizedTestPar.map(lambda lp: LabeledPoint(lp.label,
model.transform(lp.features)))

However when I do the following I get the error below. Is there any other
way to filter my data to avoid this error?

filteredDataDF=filteredData.toDF()

Exception: It appears that you are attempting to reference
SparkContext from a broadcast variable, action, or transforamtion.
SparkContext can only be used on the driver, not in code that it run
on workers. For more information, see SPARK-5063.


I would directly use the spark.ml ChiSqSelector and work with
dataframes, but I am on spark 1.4 and using pyspark. So spark.ml's
ChiSqSelector is not available to me. filteredData is of type
piplelineRDD, if that helps. It is not a regular RDD. I think that may
part of why calling toDF() is not working.


Thanks,

Tobi


Re: Standalone cluster node utilization

2016-07-14 Thread Jakub Stransky
I witness really weird behavior when loading the data from RDBMS.

I tried different approach for loading the data - I provided a partitioning
column for make partitioning parallelism:

val df_init = sqlContext.read.format("jdbc").options(
  Map("url" -> Configuration.dbUrl,
"dbtable" -> Configuration.dbTable,
"driver" -> Configuration.dbDriver,
"partitionColumn"-> "Target",
"lowerBound" -> "3",
"upperBound" -> "9",
"numPartitions" -> Configuration.appPartitioning.toString
  )).load()


But what I get when I check storage tab on the UI is following distribution:

Data Distribution on 7 Executors
HostMemory UsageDisk Usage
spark1.clust:56209 145.3 MB (16.1 GB Remaining) 0.0 B
10.2.0.4:50305 0.0 B (37.2 GB Remaining) 0.0 B
spark5.clust:41822 112.0 B (16.9 GB Remaining) 0.0 B
spark4.clust:56687 112.0 B (16.9 GB Remaining) 0.0 B
spark3.clust:34263 0.0 B (16.9 GB Remaining) 0.0 B
spark2.clust:43663 112.0 B (16.9 GB Remaining) 0.0 B
spark0.clust:57445 112.0 B (16.9 GB Remaining) 0.0 B

Almost all the data resides on one node, the rest is negligible. Any idea
what might be wrong with this setting? I admit that partitioning field is
not uniformly distributed but

Latter on during the computation I try to repartition data frames but the
effect is that data get collected to one node.

val df_init = sqlContext.read.format("jdbc").options(
  Map("url" -> Configuration.dbUrl,
"dbtable" -> Configuration.dbTable,
"driver" -> Configuration.dbDriver,
"partitionColumn"-> "Target",
"lowerBound" -> "3",
"upperBound" -> "9",
//"numPartitions" -> Configuration.appPartitioning.toString
"numPartitions" -> "35"
  )).load()

df_init.cache()

df_init.registerTempTable("df_init")

val df = (if (Configuration.dataSubset) {
  val (loadingCondition, TypeId) = if (args.length > 1) {
(args(1), args(2))
  }
  else
(Configuration.dataCondition, Configuration.dataType)

  sqlContext.sql(
s"""SELECT  statmement ... Condition = '$Condition'""".stripMargin)
} else {
  df_init
}).repartition(Configuration.appPartitioning)

df.persist()

Seems that none of those actually work as expected. It seems that I cannot
distribute the data across the cluster. Could someone more experienced
provide some hints whot might be wrong?

Thanks










On 14 July 2016 at 19:31, Jakub Stransky  wrote:

> HI Talebzadeh,
>
> sorry I forget to answer last part of your question:
>
> At O/S level you should see many CoarseGrainedExecutorBackend through jps
> each corresponding to one executor. Are they doing anything?
>
> There is one worker with one executor bussy and the rest is almost idle:
>
>   PID USER  PR  NIVIRTRESSHR S  %CPU %MEM TIME+ COMMAND
>  9305 spark 20   0 30.489g 5.075g  22256 S  * 0.3 18.5*   0:36.25 java
>
> The only one - bussy one is running all 8cores machine
>
>   PID USER  PR  NIVIRTRESSHR S  %CPU %MEM TIME+ COMMAND
>  9580 zdata 20   0 29.664g 0.021t   6836 S* 676.7 79.4*  40:08.61 java
>
>
> Thanks
> Jakub
>
> On 14 July 2016 at 19:22, Jakub Stransky  wrote:
>
>> HI Talebzadeh,
>>
>> we are using 6 worker machines - running.
>>
>> We are reading the data through sqlContext (data frame) as it is
>> suggested in the documentation over the JdbcRdd
>>
>> prop just specifies name, password, and driver class.
>>
>> Right after this data load we register it as a temp table
>>
>> val df_init = sqlContext.read
>>   .jdbc(
>> url = Configuration.dbUrl,
>> table = Configuration.dbTable,
>> prop
>>   )
>>
>> df_init.registerTempTable("df_init")
>>
>> Afterwords we do some data filtering, column selection and filtering some
>> rows with sqlContext.sql ("select statement here")
>>
>> and after this selection we try to repartition the data in order to get
>> them distributed across the cluster and that seems it is not working. And
>> then we persist that filtered and selected dataFrame.
>>
>> And the desired state should be filtered dataframe should be distributed
>> accross the nodes in the cluster.
>>
>> Jakub
>>
>>
>>
>> On 14 July 2016 at 19:03, Mich Talebzadeh 
>> wrote:
>>
>>> Hi Jakub,
>>>
>>> Sounds like one executor. Can you point out:
>>>
>>>
>>>1. The number of slaves/workers you are running
>>>2. Are you using JDBC to read data in?
>>>3. Do you register DF as temp table and if so have you cached temp
>>>table
>>>
>>> Sounds like only one executor is active and the rest are sitting idele.
>>>
>>> At O/S level you should see many CoarseGrainedExecutorBackend through
>>> jps each corresponding to one executor. Are they doing anything?
>>>
>>> HTH
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * 
>>> 

Re: Spark Streaming Kinesis Performance Decrease When Cluster Scale Up with More Executors

2016-07-14 Thread Daniel Santana
Are you re-sharding your kinesis stream as well?

I had a similar problem and increasing the number of kinesis stream shards
solved it.

-- 
*Daniel Santana*
Senior Software Engineer

EVERY*MUNDO*
25 SE 2nd Ave., Suite 900
Miami, FL 33131 USA
main:+1 (305) 375-0045
EveryMundo.com 

*Confidentiality Notice: *This email and any files transmitted with it are
confidential and intended solely for the use of the individual or entity to
whom they are addressed. If you have received this email in error, please
notify the system manager.

On Thu, Jul 14, 2016 at 2:20 PM, Renxia Wang  wrote:

> Additional information: The batch duration in my app is 1 minute, from
> Spark UI, for each batch, the difference between Output Op Duration and Job
> Duration is big. E.g. Output Op Duration is 1min while Job Duration is 19s.
>
> 2016-07-14 10:49 GMT-07:00 Renxia Wang :
>
>> Hi all,
>>
>> I am running a Spark Streaming application with Kinesis on EMR 4.7.1. The
>> application runs on YARN and use client mode. There are 17 worker nodes
>> (c3.8xlarge) with 100 executors and 100 receivers. This setting works fine.
>>
>> But when I increase the number of worker nodes to 50, and increase the
>> number of executors to 250, with the 250 receivers, the processing time of
>> batches increase from ~50s to 2.3min, and scheduler delay for tasks
>> increase from ~0.2s max to 20s max (while 75th percentile is about 2-3s).
>>
>> I tried to only increase the number executors but keep the number of
>> receivers, but then I still see performance degrade from ~50s to 1.1min,
>> and for tasks the scheduler delay increased from ~0.2s max to 4s max (while
>> 75th percentile is about 1s).
>>
>> The spark-submit is as follow. The only parameter I changed here is the
>> num-executors.
>>
>> spark-submit
>> --deploy-mode client
>> --verbose
>> --master yarn
>> --jars /usr/lib/spark/extras/lib/spark-streaming-kinesis-asl.jar
>> --driver-memory 20g --driver-cores 20
>> --num-executors 250
>> --executor-cores 5
>> --executor-memory 8g
>> --conf spark.yarn.executor.memoryOverhead=1600
>> --conf spark.driver.maxResultSize=0
>> --conf spark.dynamicAllocation.enabled=false
>> --conf spark.rdd.compress=true
>> --conf spark.streaming.stopGracefullyOnShutdown=true
>> --conf spark.streaming.backpressure.enabled=true
>> --conf spark.speculation=true
>> --conf spark.task.maxFailures=15
>> --conf spark.ui.retainedJobs=100
>> --conf spark.ui.retainedStages=100
>> --conf spark.executor.logs.rolling.maxRetainedFiles=1
>> --conf spark.executor.logs.rolling.strategy=time
>> --conf spark.executor.logs.rolling.time.interval=hourly
>> --conf spark.scheduler.mode=FAIR
>> --conf spark.scheduler.allocation.file=/home/hadoop/fairscheduler.xml
>> --conf spark.metrics.conf=/home/hadoop/spark-metrics.properties
>> --class Main /home/hadoop/Main-1.0.jar
>>
>> I found this issue seems relevant:
>> https://issues.apache.org/jira/browse/SPARK-14327
>>
>> Any suggestion for me to troubleshoot this issue?
>>
>> Thanks,
>>
>> Renxia
>>
>>
>


Re: Call http request from within Spark

2016-07-14 Thread Ted Yu
Second to what Pedro said in the second paragraph.

Issuing http request per row would not scale.

On Thu, Jul 14, 2016 at 12:26 PM, Pedro Rodriguez 
wrote:

> Hi Amit,
>
> Have you tried running a subset of the IDs locally on a single thread? It
> would be useful to benchmark your getProfile function for a subset of the
> data then estimate how long the full data set would take then divide by
> number of spark executor cores. This should at least serve as a sanity
> check. If things are much slower than expected is it possible that the
> service has a rate limit per ip address that you are hitting?
>
> If requests is more efficient at batching requests together (I don't know
> much about its internal implementation and connection pools) you could do
> that with mapPartitions. This is useful when the initialization time of the
> function in the map call is expensive (eg uses a connection pool for a db
> or web) as it allows you to initialize that resource once per partition
> then reuse it for all the elements in the partition.
>
> Pedro
>
> On Thu, Jul 14, 2016 at 8:52 AM, Amit Dutta 
> wrote:
>
>> Hi All,
>>
>>
>> I have a requirement to call a rest service url for 300k customer ids.
>>
>> Things I have tried so far is
>>
>>
>> custid_rdd = sc.textFile('file:Users/zzz/CustomerID_SC/Inactive User
>> Hashed LCID List.csv') #getting all the customer ids and building adds
>>
>> profile_rdd = custid_rdd.map(lambda r: getProfile(r.split(',')[0]))
>>
>> profile_rdd.count()
>>
>>
>> #getprofile is the method to do the http call
>>
>> def getProfile(cust_id):
>>
>> api_key = 'txt'
>>
>> api_secret = 'yuyuy'
>>
>> profile_uri = 'https://profile.localytics.com/x1/customers/{}'
>>
>> customer_id = cust_id
>>
>>
>> if customer_id is not None:
>>
>> data = requests.get(profile_uri.format(customer_id),
>> auth=requests.auth.HTTPBasicAuth(api_key, api_secret))
>>
>> # print json.dumps(data.json(), indent=4)
>>
>> return data
>>
>>
>> when I print the json dump of the data i see it returning results from
>> the rest call. But the count never stops.
>>
>>
>> Is there an efficient way of dealing this? Some post says we have to
>> define a batch size etc but don't know how.
>>
>>
>> Appreciate your help
>>
>>
>> Regards,
>>
>> Amit
>>
>>
>
>
> --
> Pedro Rodriguez
> PhD Student in Distributed Machine Learning | CU Boulder
> UC Berkeley AMPLab Alumni
>
> ski.rodrig...@gmail.com | pedrorodriguez.io | 909-353-4423
> Github: github.com/EntilZha | LinkedIn:
> https://www.linkedin.com/in/pedrorodriguezscience
>
>


Re: Call http request from within Spark

2016-07-14 Thread Pedro Rodriguez
Hi Amit,

Have you tried running a subset of the IDs locally on a single thread? It
would be useful to benchmark your getProfile function for a subset of the
data then estimate how long the full data set would take then divide by
number of spark executor cores. This should at least serve as a sanity
check. If things are much slower than expected is it possible that the
service has a rate limit per ip address that you are hitting?

If requests is more efficient at batching requests together (I don't know
much about its internal implementation and connection pools) you could do
that with mapPartitions. This is useful when the initialization time of the
function in the map call is expensive (eg uses a connection pool for a db
or web) as it allows you to initialize that resource once per partition
then reuse it for all the elements in the partition.

Pedro

On Thu, Jul 14, 2016 at 8:52 AM, Amit Dutta  wrote:

> Hi All,
>
>
> I have a requirement to call a rest service url for 300k customer ids.
>
> Things I have tried so far is
>
>
> custid_rdd = sc.textFile('file:Users/zzz/CustomerID_SC/Inactive User
> Hashed LCID List.csv') #getting all the customer ids and building adds
>
> profile_rdd = custid_rdd.map(lambda r: getProfile(r.split(',')[0]))
>
> profile_rdd.count()
>
>
> #getprofile is the method to do the http call
>
> def getProfile(cust_id):
>
> api_key = 'txt'
>
> api_secret = 'yuyuy'
>
> profile_uri = 'https://profile.localytics.com/x1/customers/{}'
>
> customer_id = cust_id
>
>
> if customer_id is not None:
>
> data = requests.get(profile_uri.format(customer_id),
> auth=requests.auth.HTTPBasicAuth(api_key, api_secret))
>
> # print json.dumps(data.json(), indent=4)
>
> return data
>
>
> when I print the json dump of the data i see it returning results from the
> rest call. But the count never stops.
>
>
> Is there an efficient way of dealing this? Some post says we have to
> define a batch size etc but don't know how.
>
>
> Appreciate your help
>
>
> Regards,
>
> Amit
>
>


-- 
Pedro Rodriguez
PhD Student in Distributed Machine Learning | CU Boulder
UC Berkeley AMPLab Alumni

ski.rodrig...@gmail.com | pedrorodriguez.io | 909-353-4423
Github: github.com/EntilZha | LinkedIn:
https://www.linkedin.com/in/pedrorodriguezscience


HiveThriftServer and spark.sql.hive.thriftServer.singleSession setting

2016-07-14 Thread Chang Lim
Hi,

I am on Spark 2.0 Review release.  According to Spark 2.0 docs, to share
TempTable/View, I need to:
"to run the Thrift server in the old single-session mode, please set
option spark.sql.hive.thriftServer.singleSession to true."
Question: *When using HiveThriftServer2.startWithContext(), where do I set
the above setting?*  I tried the following 4 possible places to set the flag
but it does not seem to work.  What am I missing?

val spark = SparkSession.builder.master("local[2]")
   .enableHiveSupport().appName("App")
   .config("spark.sql.hive.thriftServer.singleSession", "true")  // <--- 1.
is this the correct place to set??
   .getOrCreate()

//starts Thrift Server
//spark.conf.set("spark.sql.hive.thriftServer.singleSession", true)  <-- 2.
Tried this, don't seem to work
//spark.sparkContext.getConf.set("spark.sql.hive.thriftServer.singleSession",
"true")  <-- 3. Tried this, don't seem to work
val sql = new org.apache.spark.sql.SQLContext(spark.sparkContext)
sql.setConf("spark.sql.hive.thriftServer.singleSession", "true")  // <-- 4.
Tried this, don't seem to work
HiveThriftServer2.startWithContext(sql)





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/HiveThriftServer-and-spark-sql-hive-thriftServer-singleSession-setting-tp27340.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: repartitionAndSortWithinPartitions HELP

2016-07-14 Thread Punit Naik
I meant to say that first we can sort the individual partitions and then
sort them again by merging. Sort of a divide and conquer mechanism.
Does sortByKey take care of all this internally?

On Fri, Jul 15, 2016 at 12:08 AM, Punit Naik  wrote:

> Can we increase the sorting speed of RDD by doing a secondary sort first?
>
> On Thu, Jul 14, 2016 at 11:52 PM, Punit Naik 
> wrote:
>
>> Okay. Can't I supply the same partitioner I used for
>> "repartitionAndSortWithinPartitions" as an argument to "sortByKey"?
>>
>> On 14-Jul-2016 11:38 PM, "Koert Kuipers"  wrote:
>>
>>> repartitionAndSortWithinPartitions partitions the rdd and sorts within
>>> each partition. so each partition is fully sorted, but the rdd is not
>>> sorted.
>>>
>>> sortByKey is basically the same as repartitionAndSortWithinPartitions
>>> except it uses a range partitioner so that the entire rdd is sorted.
>>> however since sortByKey uses a different partitioner than
>>> repartitionAndSortWithinPartitions you do not get much benefit from running
>>> sortByKey after repartitionAndSortWithinPartitions (because all the data
>>> will get shuffled again)
>>>
>>>
>>> On Thu, Jul 14, 2016 at 1:59 PM, Punit Naik 
>>> wrote:
>>>
 Hi Koert

 I have already used "repartitionAndSortWithinPartitions" for secondary
 sorting and it works fine. Just wanted to know whether it will sort the
 entire RDD or not.

 On Thu, Jul 14, 2016 at 11:25 PM, Koert Kuipers 
 wrote:

> repartitionAndSortWithinPartit sort by keys, not values per key, so
> not really secondary sort by itself.
>
> for secondary sort also check out:
> https://github.com/tresata/spark-sorted
>
>
> On Thu, Jul 14, 2016 at 1:09 PM, Punit Naik 
> wrote:
>
>> Hi guys
>>
>> In my spark/scala code I am implementing secondary sort. I wanted to
>> know, when I call the "repartitionAndSortWithinPartitions" method, the
>> whole (entire) RDD will be sorted or only the individual partitions will 
>> be
>> sorted?
>> If its the latter case, will applying a "sortByKey" after
>> "repartitionAndSortWithinPartitions" be faster now that the individual
>> partitions are sorted?
>>
>> --
>> Thank You
>>
>> Regards
>>
>> Punit Naik
>>
>
>


 --
 Thank You

 Regards

 Punit Naik

>>>
>>>
>
>
> --
> Thank You
>
> Regards
>
> Punit Naik
>



-- 
Thank You

Regards

Punit Naik


Re: repartitionAndSortWithinPartitions HELP

2016-07-14 Thread Punit Naik
Can we increase the sorting speed of RDD by doing a secondary sort first?

On Thu, Jul 14, 2016 at 11:52 PM, Punit Naik  wrote:

> Okay. Can't I supply the same partitioner I used for
> "repartitionAndSortWithinPartitions" as an argument to "sortByKey"?
>
> On 14-Jul-2016 11:38 PM, "Koert Kuipers"  wrote:
>
>> repartitionAndSortWithinPartitions partitions the rdd and sorts within
>> each partition. so each partition is fully sorted, but the rdd is not
>> sorted.
>>
>> sortByKey is basically the same as repartitionAndSortWithinPartitions
>> except it uses a range partitioner so that the entire rdd is sorted.
>> however since sortByKey uses a different partitioner than
>> repartitionAndSortWithinPartitions you do not get much benefit from running
>> sortByKey after repartitionAndSortWithinPartitions (because all the data
>> will get shuffled again)
>>
>>
>> On Thu, Jul 14, 2016 at 1:59 PM, Punit Naik 
>> wrote:
>>
>>> Hi Koert
>>>
>>> I have already used "repartitionAndSortWithinPartitions" for secondary
>>> sorting and it works fine. Just wanted to know whether it will sort the
>>> entire RDD or not.
>>>
>>> On Thu, Jul 14, 2016 at 11:25 PM, Koert Kuipers 
>>> wrote:
>>>
 repartitionAndSortWithinPartit sort by keys, not values per key, so not
 really secondary sort by itself.

 for secondary sort also check out:
 https://github.com/tresata/spark-sorted


 On Thu, Jul 14, 2016 at 1:09 PM, Punit Naik 
 wrote:

> Hi guys
>
> In my spark/scala code I am implementing secondary sort. I wanted to
> know, when I call the "repartitionAndSortWithinPartitions" method, the
> whole (entire) RDD will be sorted or only the individual partitions will 
> be
> sorted?
> If its the latter case, will applying a "sortByKey" after
> "repartitionAndSortWithinPartitions" be faster now that the individual
> partitions are sorted?
>
> --
> Thank You
>
> Regards
>
> Punit Naik
>


>>>
>>>
>>> --
>>> Thank You
>>>
>>> Regards
>>>
>>> Punit Naik
>>>
>>
>>


-- 
Thank You

Regards

Punit Naik


Re: repartitionAndSortWithinPartitions HELP

2016-07-14 Thread Punit Naik
Okay. Can't I supply the same partitioner I used for
"repartitionAndSortWithinPartitions" as an argument to "sortByKey"?

On 14-Jul-2016 11:38 PM, "Koert Kuipers"  wrote:

> repartitionAndSortWithinPartitions partitions the rdd and sorts within
> each partition. so each partition is fully sorted, but the rdd is not
> sorted.
>
> sortByKey is basically the same as repartitionAndSortWithinPartitions
> except it uses a range partitioner so that the entire rdd is sorted.
> however since sortByKey uses a different partitioner than
> repartitionAndSortWithinPartitions you do not get much benefit from running
> sortByKey after repartitionAndSortWithinPartitions (because all the data
> will get shuffled again)
>
>
> On Thu, Jul 14, 2016 at 1:59 PM, Punit Naik 
> wrote:
>
>> Hi Koert
>>
>> I have already used "repartitionAndSortWithinPartitions" for secondary
>> sorting and it works fine. Just wanted to know whether it will sort the
>> entire RDD or not.
>>
>> On Thu, Jul 14, 2016 at 11:25 PM, Koert Kuipers 
>> wrote:
>>
>>> repartitionAndSortWithinPartit sort by keys, not values per key, so not
>>> really secondary sort by itself.
>>>
>>> for secondary sort also check out:
>>> https://github.com/tresata/spark-sorted
>>>
>>>
>>> On Thu, Jul 14, 2016 at 1:09 PM, Punit Naik 
>>> wrote:
>>>
 Hi guys

 In my spark/scala code I am implementing secondary sort. I wanted to
 know, when I call the "repartitionAndSortWithinPartitions" method, the
 whole (entire) RDD will be sorted or only the individual partitions will be
 sorted?
 If its the latter case, will applying a "sortByKey" after
 "repartitionAndSortWithinPartitions" be faster now that the individual
 partitions are sorted?

 --
 Thank You

 Regards

 Punit Naik

>>>
>>>
>>
>>
>> --
>> Thank You
>>
>> Regards
>>
>> Punit Naik
>>
>
>


Re: Spark Streaming Kinesis Performance Decrease When Cluster Scale Up with More Executors

2016-07-14 Thread Renxia Wang
Additional information: The batch duration in my app is 1 minute, from
Spark UI, for each batch, the difference between Output Op Duration and Job
Duration is big. E.g. Output Op Duration is 1min while Job Duration is 19s.

2016-07-14 10:49 GMT-07:00 Renxia Wang :

> Hi all,
>
> I am running a Spark Streaming application with Kinesis on EMR 4.7.1. The
> application runs on YARN and use client mode. There are 17 worker nodes
> (c3.8xlarge) with 100 executors and 100 receivers. This setting works fine.
>
> But when I increase the number of worker nodes to 50, and increase the
> number of executors to 250, with the 250 receivers, the processing time of
> batches increase from ~50s to 2.3min, and scheduler delay for tasks
> increase from ~0.2s max to 20s max (while 75th percentile is about 2-3s).
>
> I tried to only increase the number executors but keep the number of
> receivers, but then I still see performance degrade from ~50s to 1.1min,
> and for tasks the scheduler delay increased from ~0.2s max to 4s max (while
> 75th percentile is about 1s).
>
> The spark-submit is as follow. The only parameter I changed here is the
> num-executors.
>
> spark-submit
> --deploy-mode client
> --verbose
> --master yarn
> --jars /usr/lib/spark/extras/lib/spark-streaming-kinesis-asl.jar
> --driver-memory 20g --driver-cores 20
> --num-executors 250
> --executor-cores 5
> --executor-memory 8g
> --conf spark.yarn.executor.memoryOverhead=1600
> --conf spark.driver.maxResultSize=0
> --conf spark.dynamicAllocation.enabled=false
> --conf spark.rdd.compress=true
> --conf spark.streaming.stopGracefullyOnShutdown=true
> --conf spark.streaming.backpressure.enabled=true
> --conf spark.speculation=true
> --conf spark.task.maxFailures=15
> --conf spark.ui.retainedJobs=100
> --conf spark.ui.retainedStages=100
> --conf spark.executor.logs.rolling.maxRetainedFiles=1
> --conf spark.executor.logs.rolling.strategy=time
> --conf spark.executor.logs.rolling.time.interval=hourly
> --conf spark.scheduler.mode=FAIR
> --conf spark.scheduler.allocation.file=/home/hadoop/fairscheduler.xml
> --conf spark.metrics.conf=/home/hadoop/spark-metrics.properties
> --class Main /home/hadoop/Main-1.0.jar
>
> I found this issue seems relevant:
> https://issues.apache.org/jira/browse/SPARK-14327
>
> Any suggestion for me to troubleshoot this issue?
>
> Thanks,
>
> Renxia
>
>


Re: repartitionAndSortWithinPartitions HELP

2016-07-14 Thread Koert Kuipers
repartitionAndSortWithinPartitions partitions the rdd and sorts within each
partition. so each partition is fully sorted, but the rdd is not sorted.

sortByKey is basically the same as repartitionAndSortWithinPartitions
except it uses a range partitioner so that the entire rdd is sorted.
however since sortByKey uses a different partitioner than
repartitionAndSortWithinPartitions you do not get much benefit from running
sortByKey after repartitionAndSortWithinPartitions (because all the data
will get shuffled again)


On Thu, Jul 14, 2016 at 1:59 PM, Punit Naik  wrote:

> Hi Koert
>
> I have already used "repartitionAndSortWithinPartitions" for secondary
> sorting and it works fine. Just wanted to know whether it will sort the
> entire RDD or not.
>
> On Thu, Jul 14, 2016 at 11:25 PM, Koert Kuipers  wrote:
>
>> repartitionAndSortWithinPartit sort by keys, not values per key, so not
>> really secondary sort by itself.
>>
>> for secondary sort also check out:
>> https://github.com/tresata/spark-sorted
>>
>>
>> On Thu, Jul 14, 2016 at 1:09 PM, Punit Naik 
>> wrote:
>>
>>> Hi guys
>>>
>>> In my spark/scala code I am implementing secondary sort. I wanted to
>>> know, when I call the "repartitionAndSortWithinPartitions" method, the
>>> whole (entire) RDD will be sorted or only the individual partitions will be
>>> sorted?
>>> If its the latter case, will applying a "sortByKey" after
>>> "repartitionAndSortWithinPartitions" be faster now that the individual
>>> partitions are sorted?
>>>
>>> --
>>> Thank You
>>>
>>> Regards
>>>
>>> Punit Naik
>>>
>>
>>
>
>
> --
> Thank You
>
> Regards
>
> Punit Naik
>


Re: repartitionAndSortWithinPartitions HELP

2016-07-14 Thread Punit Naik
Hi Koert

I have already used "repartitionAndSortWithinPartitions" for secondary
sorting and it works fine. Just wanted to know whether it will sort the
entire RDD or not.

On Thu, Jul 14, 2016 at 11:25 PM, Koert Kuipers  wrote:

> repartitionAndSortWithinPartit sort by keys, not values per key, so not
> really secondary sort by itself.
>
> for secondary sort also check out:
> https://github.com/tresata/spark-sorted
>
>
> On Thu, Jul 14, 2016 at 1:09 PM, Punit Naik 
> wrote:
>
>> Hi guys
>>
>> In my spark/scala code I am implementing secondary sort. I wanted to
>> know, when I call the "repartitionAndSortWithinPartitions" method, the
>> whole (entire) RDD will be sorted or only the individual partitions will be
>> sorted?
>> If its the latter case, will applying a "sortByKey" after
>> "repartitionAndSortWithinPartitions" be faster now that the individual
>> partitions are sorted?
>>
>> --
>> Thank You
>>
>> Regards
>>
>> Punit Naik
>>
>
>


-- 
Thank You

Regards

Punit Naik


Re: repartitionAndSortWithinPartitions HELP

2016-07-14 Thread Koert Kuipers
repartitionAndSortWithinPartit sort by keys, not values per key, so not
really secondary sort by itself.

for secondary sort also check out:
https://github.com/tresata/spark-sorted


On Thu, Jul 14, 2016 at 1:09 PM, Punit Naik  wrote:

> Hi guys
>
> In my spark/scala code I am implementing secondary sort. I wanted to know,
> when I call the "repartitionAndSortWithinPartitions" method, the whole
> (entire) RDD will be sorted or only the individual partitions will be
> sorted?
> If its the latter case, will applying a "sortByKey" after
> "repartitionAndSortWithinPartitions" be faster now that the individual
> partitions are sorted?
>
> --
> Thank You
>
> Regards
>
> Punit Naik
>


Spark Streaming Kinesis Performance Decrease When Cluster Scale Up with More Executors

2016-07-14 Thread Renxia Wang
Hi all,

I am running a Spark Streaming application with Kinesis on EMR 4.7.1. The
application runs on YARN and use client mode. There are 17 worker nodes
(c3.8xlarge) with 100 executors and 100 receivers. This setting works fine.

But when I increase the number of worker nodes to 50, and increase the
number of executors to 250, with the 250 receivers, the processing time of
batches increase from ~50s to 2.3min, and scheduler delay for tasks
increase from ~0.2s max to 20s max (while 75th percentile is about 2-3s).

I tried to only increase the number executors but keep the number of
receivers, but then I still see performance degrade from ~50s to 1.1min,
and for tasks the scheduler delay increased from ~0.2s max to 4s max (while
75th percentile is about 1s).

The spark-submit is as follow. The only parameter I changed here is the
num-executors.

spark-submit
--deploy-mode client
--verbose
--master yarn
--jars /usr/lib/spark/extras/lib/spark-streaming-kinesis-asl.jar
--driver-memory 20g --driver-cores 20
--num-executors 250
--executor-cores 5
--executor-memory 8g
--conf spark.yarn.executor.memoryOverhead=1600
--conf spark.driver.maxResultSize=0
--conf spark.dynamicAllocation.enabled=false
--conf spark.rdd.compress=true
--conf spark.streaming.stopGracefullyOnShutdown=true
--conf spark.streaming.backpressure.enabled=true
--conf spark.speculation=true
--conf spark.task.maxFailures=15
--conf spark.ui.retainedJobs=100
--conf spark.ui.retainedStages=100
--conf spark.executor.logs.rolling.maxRetainedFiles=1
--conf spark.executor.logs.rolling.strategy=time
--conf spark.executor.logs.rolling.time.interval=hourly
--conf spark.scheduler.mode=FAIR
--conf spark.scheduler.allocation.file=/home/hadoop/fairscheduler.xml
--conf spark.metrics.conf=/home/hadoop/spark-metrics.properties
--class Main /home/hadoop/Main-1.0.jar

I found this issue seems relevant:
https://issues.apache.org/jira/browse/SPARK-14327

Any suggestion for me to troubleshoot this issue?

Thanks,

Renxia


Re: Standalone cluster node utilization

2016-07-14 Thread Jakub Stransky
HI Talebzadeh,

sorry I forget to answer last part of your question:

At O/S level you should see many CoarseGrainedExecutorBackend through jps
each corresponding to one executor. Are they doing anything?

There is one worker with one executor bussy and the rest is almost idle:

  PID USER  PR  NIVIRTRESSHR S  %CPU %MEM TIME+ COMMAND
 9305 spark 20   0 30.489g 5.075g  22256 S  * 0.3 18.5*   0:36.25 java

The only one - bussy one is running all 8cores machine

  PID USER  PR  NIVIRTRESSHR S  %CPU %MEM TIME+ COMMAND
 9580 zdata 20   0 29.664g 0.021t   6836 S* 676.7 79.4*  40:08.61 java


Thanks
Jakub

On 14 July 2016 at 19:22, Jakub Stransky  wrote:

> HI Talebzadeh,
>
> we are using 6 worker machines - running.
>
> We are reading the data through sqlContext (data frame) as it is suggested
> in the documentation over the JdbcRdd
>
> prop just specifies name, password, and driver class.
>
> Right after this data load we register it as a temp table
>
> val df_init = sqlContext.read
>   .jdbc(
> url = Configuration.dbUrl,
> table = Configuration.dbTable,
> prop
>   )
>
> df_init.registerTempTable("df_init")
>
> Afterwords we do some data filtering, column selection and filtering some
> rows with sqlContext.sql ("select statement here")
>
> and after this selection we try to repartition the data in order to get
> them distributed across the cluster and that seems it is not working. And
> then we persist that filtered and selected dataFrame.
>
> And the desired state should be filtered dataframe should be distributed
> accross the nodes in the cluster.
>
> Jakub
>
>
>
> On 14 July 2016 at 19:03, Mich Talebzadeh 
> wrote:
>
>> Hi Jakub,
>>
>> Sounds like one executor. Can you point out:
>>
>>
>>1. The number of slaves/workers you are running
>>2. Are you using JDBC to read data in?
>>3. Do you register DF as temp table and if so have you cached temp
>>table
>>
>> Sounds like only one executor is active and the rest are sitting idele.
>>
>> At O/S level you should see many CoarseGrainedExecutorBackend through jps
>> each corresponding to one executor. Are they doing anything?
>>
>> HTH
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>> On 14 July 2016 at 17:18, Jakub Stransky  wrote:
>>
>>> Hello,
>>>
>>> I have a spark  cluster running in a single mode, master + 6 executors.
>>>
>>> My application is reading a data from database via DataFrame.read then
>>> there is a filtering of rows. After that I re-partition data and I wonder
>>> why on the executors page of the driver UI I see RDD blocks all allocated
>>> still on single executor machine
>>>
>>> [image: Inline images 1]
>>> As highlighted on the picture above. I did expect that after
>>> re-partition the data will be shuffled across cluster but that is obviously
>>> not happening here.
>>>
>>> I can understand that database read is happening in non-parallel fashion
>>> but re-partition  should fix it as far as I understand.
>>>
>>> Could someone experienced clarify that?
>>>
>>> Thanks
>>>
>>
>>
>
>
> --
> Jakub Stransky
> cz.linkedin.com/in/jakubstransky
>
>


-- 
Jakub Stransky
cz.linkedin.com/in/jakubstransky


Re: Standalone cluster node utilization

2016-07-14 Thread Jakub Stransky
HI Talebzadeh,

we are using 6 worker machines - running.

We are reading the data through sqlContext (data frame) as it is suggested
in the documentation over the JdbcRdd

prop just specifies name, password, and driver class.

Right after this data load we register it as a temp table

val df_init = sqlContext.read
  .jdbc(
url = Configuration.dbUrl,
table = Configuration.dbTable,
prop
  )

df_init.registerTempTable("df_init")

Afterwords we do some data filtering, column selection and filtering some
rows with sqlContext.sql ("select statement here")

and after this selection we try to repartition the data in order to get
them distributed across the cluster and that seems it is not working. And
then we persist that filtered and selected dataFrame.

And the desired state should be filtered dataframe should be distributed
accross the nodes in the cluster.

Jakub



On 14 July 2016 at 19:03, Mich Talebzadeh  wrote:

> Hi Jakub,
>
> Sounds like one executor. Can you point out:
>
>
>1. The number of slaves/workers you are running
>2. Are you using JDBC to read data in?
>3. Do you register DF as temp table and if so have you cached temp
>table
>
> Sounds like only one executor is active and the rest are sitting idele.
>
> At O/S level you should see many CoarseGrainedExecutorBackend through jps
> each corresponding to one executor. Are they doing anything?
>
> HTH
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 14 July 2016 at 17:18, Jakub Stransky  wrote:
>
>> Hello,
>>
>> I have a spark  cluster running in a single mode, master + 6 executors.
>>
>> My application is reading a data from database via DataFrame.read then
>> there is a filtering of rows. After that I re-partition data and I wonder
>> why on the executors page of the driver UI I see RDD blocks all allocated
>> still on single executor machine
>>
>> [image: Inline images 1]
>> As highlighted on the picture above. I did expect that after re-partition
>> the data will be shuffled across cluster but that is obviously not
>> happening here.
>>
>> I can understand that database read is happening in non-parallel fashion
>> but re-partition  should fix it as far as I understand.
>>
>> Could someone experienced clarify that?
>>
>> Thanks
>>
>
>


-- 
Jakub Stransky
cz.linkedin.com/in/jakubstransky


repartitionAndSortWithinPartitions HELP

2016-07-14 Thread Punit Naik
Hi guys

In my spark/scala code I am implementing secondary sort. I wanted to know,
when I call the "repartitionAndSortWithinPartitions" method, the whole
(entire) RDD will be sorted or only the individual partitions will be
sorted?
If its the latter case, will applying a "sortByKey" after
"repartitionAndSortWithinPartitions" be faster now that the individual
partitions are sorted?

-- 
Thank You

Regards

Punit Naik


Re: Standalone cluster node utilization

2016-07-14 Thread Mich Talebzadeh
Hi Jakub,

Sounds like one executor. Can you point out:


   1. The number of slaves/workers you are running
   2. Are you using JDBC to read data in?
   3. Do you register DF as temp table and if so have you cached temp table

Sounds like only one executor is active and the rest are sitting idele.

At O/S level you should see many CoarseGrainedExecutorBackend through jps
each corresponding to one executor. Are they doing anything?

HTH

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 14 July 2016 at 17:18, Jakub Stransky  wrote:

> Hello,
>
> I have a spark  cluster running in a single mode, master + 6 executors.
>
> My application is reading a data from database via DataFrame.read then
> there is a filtering of rows. After that I re-partition data and I wonder
> why on the executors page of the driver UI I see RDD blocks all allocated
> still on single executor machine
>
> [image: Inline images 1]
> As highlighted on the picture above. I did expect that after re-partition
> the data will be shuffled across cluster but that is obviously not
> happening here.
>
> I can understand that database read is happening in non-parallel fashion
> but re-partition  should fix it as far as I understand.
>
> Could someone experienced clarify that?
>
> Thanks
>


Re: Standalone cluster node utilization

2016-07-14 Thread Zhou (Joe) Xing

i have seen similar behavior in my standalone cluster, I tried to increase the 
number of partitions and at some point it seems all the executors or worker 
nodes start to make parallel connection to remote data store. But it would be 
nice if someone could point us to some references on how to make proper use of 
the repartition of data from a remote data store read by spark SQL, thanks a lot

zhou




> On Jul 14, 2016, at 9:18 AM, Jakub Stransky  wrote:
> 
> 


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Difference JavaReceiverInputDStream and JavaDStream

2016-07-14 Thread Paolo Patierno
Hi all,

what is the difference between JavaReceiverInputDStream and JavaDStream ?

I see that the last one is always used in alla custom receiver when the 
createStream is going to be used for Python.

Thanks,
Paolo.

  

Standalone cluster node utilization

2016-07-14 Thread Jakub Stransky
Hello,

I have a spark  cluster running in a single mode, master + 6 executors.

My application is reading a data from database via DataFrame.read then
there is a filtering of rows. After that I re-partition data and I wonder
why on the executors page of the driver UI I see RDD blocks all allocated
still on single executor machine

[image: Inline images 1]
As highlighted on the picture above. I did expect that after re-partition
the data will be shuffled across cluster but that is obviously not
happening here.

I can understand that database read is happening in non-parallel fashion
but re-partition  should fix it as far as I understand.

Could someone experienced clarify that?

Thanks


Call http request from within Spark

2016-07-14 Thread Amit Dutta
Hi All,


I have a requirement to call a rest service url for 300k customer ids.

Things I have tried so far is


custid_rdd = sc.textFile('file:Users/zzz/CustomerID_SC/Inactive User Hashed 
LCID List.csv') #getting all the customer ids and building adds

profile_rdd = custid_rdd.map(lambda r: getProfile(r.split(',')[0]))

profile_rdd.count()


#getprofile is the method to do the http call

def getProfile(cust_id):

api_key = 'txt'

api_secret = 'yuyuy'

profile_uri = 'https://profile.localytics.com/x1/customers/{}'

customer_id = cust_id


if customer_id is not None:

data = requests.get(profile_uri.format(customer_id), 
auth=requests.auth.HTTPBasicAuth(api_key, api_secret))

# print json.dumps(data.json(), indent=4)

return data


when I print the json dump of the data i see it returning results from the rest 
call. But the count never stops.


Is there an efficient way of dealing this? Some post says we have to define a 
batch size etc but don't know how.


Appreciate your help


Regards,

Amit


ranks and cubes

2016-07-14 Thread talgr
I have a dataframe with few dimensions, for example:


I want to build a cube on i,j,k,  and get a rank based on total per row (per
grouping)
so that when doing:
df.filter('i===3 && 'j===1).show
I will get 


so basically, for any grouping combination, i need a separated dense rank
list   (i,j,k,  i,j, i,k,  i,  j,k, j, k)

Any ideas?

(in this example, total = i*j*k  )




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/ranks-and-cubes-tp27338.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: ranks and cubes

2016-07-14 Thread talgr
I'm posting again, as the tables are not showing up in the emails..

I have a dataframe with few dimensions, for example:

+---+---+---+-+
|  i|  j|  k|total|
+---+---+---+-+
|  3|  1|  1|3|
|  3|  1|  2|6|
|  3|  1|  3|9|
|  3|  1|  4|   12|
|  3|  1|  5|   15|
|  3|  1|  6|   18|
|  3|  1|  7|   21|
|  3|  1|  8|   24|
|  3|  1|  9|   27|
|  3|  2|  1|6|
|  3|  2|  2|   12|
|  3|  2|  3|   18|
|  3|  2|  4|   24|
|  3|  2|  5|   30|
|  3|  2|  6|   36|
|  3|  2|  7|   42|
|  3|  2|  8|   48|
|  3|  2|  9|   54|
|  3|  3|  1|9|
|  3|  3|  2|   18|
+---+---+---+-+

I want to build a cube on i,j,k,  and get a rank based on total per row (per
grouping)
so that when doing:
df.filter('i===3 && 'j===1).show
I will get 

+---+---++-++
|  i|  j|   k|total|rank|
+---+---++-++
|  3|  1|null|  135|   1|
|  3|  1|   0|0|  10|
|  3|  1|   1|3|   9|
|  3|  1|   2|6|   8|
|  3|  1|   3|9|   7|
|  3|  1|   4|   12|   6|
|  3|  1|   5|   15|   5|
|  3|  1|   6|   18|   4|
|  3|  1|   7|   21|   3|
|  3|  1|   8|   24|   2|
|  3|  1|   9|   27|   1|
+---+---++-++



so basically, for any grouping combination, i need a separated dense rank
list   (i,j,k,  i,j, i,k,  i,  j,k, j, k)

Any ideas?

(in this example, total = i*j*k  )



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/ranks-and-cubes-tp27338p27339.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Dense Vectors outputs in feature engineering

2016-07-14 Thread rachmaninovquartet
or would it be common practice to just retain the original categories in
another df?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Dense-Vectors-outputs-in-feature-engineering-tp27331p27337.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Dense Vectors outputs in feature engineering

2016-07-14 Thread rachmaninovquartet
Thanks Disha, that worked out well. Can you point me to an example of how to
decode my feature vectors in the dataframe, back into their categories?





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Dense-Vectors-outputs-in-feature-engineering-tp27331p27336.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Is it possible to send CSVSink metrics to HDFS

2016-07-14 Thread johnbutcher
Hi,

(first ever post)

I experimenting with a Cloudera CDH5 cluster with Spark 1.5.0.
Have tried enabling the CSVSink metrics which seems to work to linux
directories such as /tmp.
However, I'm getting errors when trying to send to an HDFS directory.
Is it possible to use HDFS?

Error message from spark-submit:

16/07/14 11:43:39.489 WARN CsvReporter: Error writing to
application_1466718205476_2664.driver.jvm.total.max
java.io.IOException: No such file or directory

Code extract:

val metrics = "hdfs://moonshot-ha-nameservice/user/jab31/metrics/"
val conf = new SparkConf()
   .setAppName("John's Evil Experiments")
   .set("spark.metrics.conf.*.sink.csv.class",
"org.apache.spark.metrics.sink.CsvSink")
   .set("spark.metrics.conf.*.sink.csv.period","1")
   .set("spark.metrics.conf.*.sink.csv.unit","seconds")
   .set("spark.metrics.conf.*.sink.csv.directory", metrics)
   .set("spark.metrics.conf.worker.sink.csv.period","1")
   .set("spark.metrics.conf.worker.sink.csv.unit","seconds")

.set("spark.metrics.conf.master.source.jvm.class","org.apache.spark.metrics.source.JvmSource")
  
.set("spark.metrics.conf.worker.source.jvm.class","org.apache.spark.metrics.source.JvmSource")
.set("spark.metrics.conf.driver.source.jvm.class","org.apache.spark.metrics.source.JvmSource")

.set("spark.metrics.conf.executor.source.jvm.class","org.apache.spark.metrics.source.JvmSource")

HDFS directory:

# hadoop fs -ls hdfs://moonshot-ha-nameservice/user/jab31/metrics/
Found 1 items
-rw-r--r--   3 jab31 msc  4 2016-07-14 11:42
hdfs://moonshot-ha-nameservice/user/jab31/metrics/test.txt

Regards,

John





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Is-it-possible-to-send-CSVSink-metrics-to-HDFS-tp27335.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Issue in spark job. Remote rpc client dissociated

2016-07-14 Thread Balachandar R.A.
Hello,

The variable argsList is an array defined above the parallel block. This
variawis accessed inside the map function. Launcher.main is not threadsafe.
Is is not possible to specify to spark that every folder needs to be
processed as a separate process in a separate working directory?

Regards
Bala
On 14-Jul-2016 2:37 pm, "Sun Rui"  wrote:

> Where is argsList defined? is Launcher.main() thread-safe? Note that if
> multiple folders are processed in a node, multiple threads may concurrently
> run in the executor, each processing a folder.
>
> On Jul 14, 2016, at 12:28, Balachandar R.A. 
> wrote:
>
> Hello Ted,
>>
>
>
> Thanks for the response. Here is the additional information.
>
>
>> I am using spark 1.6.1  (spark-1.6.1-bin-hadoop2.6)
>>
>> Here is the code snippet
>>
>>
>> JavaRDD add = jsc.parallelize(listFolders, listFolders.size());
>> JavaRDD test = add.map(new Function()
>> {
>> @Override
>> public Integer call(File file) throws Exception {
>> String folder = file.getName();
>> System.out.println("[x] Processing dataset from the
>> directory " + folder);
>> int status = 0;
>>argsList[3] = argsList[3] + "/"+ folder;   // full
>> path of the input folder. Input folder is in shared file system that every
>> worker node has access to it. Something like (“/home/user/software/data/”)
>> and folder name will be like (“20161307”)
>> argsList[7] = argsList[7] + "/" + folder + ".csv"; //
>> full path of the output.
>> try{
>> Launcher.main(argsList);  // Launcher class is a
>> black box. It process the input folder and create a csv file which in the
>> output location (argsList[7]). This is also in a shared file system
>> status = 0;
>> }
>> catch(Exception e)
>> {
>> System.out.println("[x] Execution of import tool
>> for the directory " + folder + "failed");
>> status = 0;
>> }
>> accum.add(1);
>> return status;
>> }
>> });
>>
>>
>> Here is the spark-env.sh
>>
>> export SPARK_WORKER_INSTANCES=1
>> export JAVA_HOME=/home/work_IW1/opt/jdk1.8.0_77/
>> export HADOOP_CONF_DIR=/home/work_IW1/opt/hadoop-2.7.2/etc/hadoop
>>
>> Here is the spark-defaults.conf
>>
>>
>>   spark.master spark:// master:7077
>>   spark.eventLog.enabled   true
>>   spark.eventLog.dir   hdfs://master:9000/sparkEvent
>>   spark.serializer
>> org.apache.spark.serializer.KryoSerializer
>>   spark.driver.memory  4g
>>
>>
>
>
> Hope it helps.
>
>
>


Re: Issue in spark job. Remote rpc client dissociated

2016-07-14 Thread Sun Rui
Where is argsList defined? is Launcher.main() thread-safe? Note that if 
multiple folders are processed in a node, multiple threads may concurrently run 
in the executor, each processing a folder.

> On Jul 14, 2016, at 12:28, Balachandar R.A.  wrote:
> 
> Hello Ted, 
> 
> 
> Thanks for the response. Here is the additional information.
>  
> I am using spark 1.6.1  (spark-1.6.1-bin-hadoop2.6)
>  
> Here is the code snippet
>  
>  
> JavaRDD add = jsc.parallelize(listFolders, listFolders.size());
> JavaRDD test = add.map(new Function() {
> @Override
> public Integer call(File file) throws Exception {
> String folder = file.getName();
> System.out.println("[x] Processing dataset from the 
> directory " + folder);
> int status = 0;
>argsList[3] = argsList[3] + "/"+ folder;   // full path of 
> the input folder. Input folder is in shared file system that every worker 
> node has access to it. Something like (“/home/user/software/data/”) and 
> folder name will be like (“20161307”)
> argsList[7] = argsList[7] + "/" + folder + ".csv"; // 
> full path of the output.
> try{
> Launcher.main(argsList);  // Launcher class is a 
> black box. It process the input folder and create a csv file which in the 
> output location (argsList[7]). This is also in a shared file system
> status = 0;
> }
> catch(Exception e)
> {
> System.out.println("[x] Execution of import tool for 
> the directory " + folder + "failed");
> status = 0;
> }
> accum.add(1);
> return status;
> }
> });
>  
>  
> Here is the spark-env.sh
>  
> export SPARK_WORKER_INSTANCES=1
> export JAVA_HOME=/home/work_IW1/opt/jdk1.8.0_77/
> export HADOOP_CONF_DIR=/home/work_IW1/opt/hadoop-2.7.2/etc/hadoop
>  
> Here is the spark-defaults.conf
>  
>  
>   spark.master spark:// master:7077
>   spark.eventLog.enabled   true
>   spark.eventLog.dir   hdfs://master:9000/sparkEvent
>   spark.serializer org.apache.spark.serializer.KryoSerializer
>   spark.driver.memory  4g
>  
> 
> 
> Hope it helps. 



Re: Severe Spark Streaming performance degradation after upgrading to 1.6.1

2016-07-14 Thread CosminC
Didn't have the time to investigate much further, but the one thing that
popped out is that partitioning was no longer working on 1.6.1. This would
definitely explain the 2x performance loss.

Checking 1.5.1 Spark logs for the same application showed that our
partitioner was working correctly, and after the DStream / RDD creation a
user session was only processed on a single machine. Running on top of 1.6.1
though, the session was processed on up to 4 machines, in a 5 node cluster
including the driver, with a lot of redundant operations. We use a custom
but very simple partitioner which extends HashPartitioner. It partitions on
a case class which has a single string parameter.

Speculative operations are turned off by default, and we never enabled it,
so it's not that.

Right now we're postponing any Spark upgrade, and we'll probably try to
upgrade directly to Spark 2.0, hoping the partitioning issue is no longer
present there.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Severe-Spark-Streaming-performance-degradation-after-upgrading-to-1-6-1-tp27056p27334.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org