Windowed stream operations -- These are too lazy for some use cases

2015-08-20 Thread Justin Grimes
We are aggregating real time logs of events, and want to do windows of 30
minutes. However, since the computation doesn't start until 30 minutes have
passed, there is a ton of data built up that processing could've already
started on. When it comes time to actually process the data, there is too
much for our cluster to handle at once.

The basic idea is this:

 val mergedMain = mergedStream
  .flatMap(r = ) // denormalize data for this particular output
stream
  .reduceByKey((x:Array[Int],y:Array[Int]) = sumAggregates(x,y)) //
this would sum over the batches
  .reduceByKeyAndWindow((x:Array[Int],y:Array[Int]) =
sumAggregates(x,y), 180, 180) // sum over the windows
  .map(rec = ...) // convert data to other format
  .foreachRDD{ (rdd, time) =
rdd.saveAsTextFile(...) // save to text files
  }

I would want the batches to be reduced as soon as they arrive (the first
reduceByKey), since there isn't any reason to wait. Instead all of the
unprocessed data has to be processed at the same time (this data is being
heavily denormalized in some cases, and so generates a bunch of additional
data).

Thanks for any help.


Re: How to list all dataframes and RDDs available in current session?

2015-08-20 Thread Dhaval Patel
Apologies

I accidentally included Spark User DL on BCC. The actual email message is
below.
=


Hi:

I have been working on few example using zeppelin.

I have been trying to find a command that would list all *dataframes/RDDs*
that has been created in current session. Anyone knows if there is any such
commands available?

Something similar to SparkSQL to list all temp tables :
  show tables;

Thanks,
Dhaval



On Thu, Aug 20, 2015 at 12:49 PM, Dhaval Patel dhaval1...@gmail.com wrote:

 Hi:

 I have been working on few example using zeppelin.

 I have been trying to find a command that would list all *dataframes/RDDs*
 that has been created in current session. Anyone knows if there is any such
 commands available?

 Something similar to SparkSQL to list all temp tables :
   show tables;

 Thanks,
 Dhaval



Run scala code with spark submit

2015-08-20 Thread MasterSergius
Is there any possibility to run standalone scala program via spark submit? Or
I have always put it in some packages, build it with maven (or sbt)?

What if I have just simple program, like that example word counter? 
Could anyone please, show it on this simple test file Greeting.scala:



It comiles with scalac, runs with scala. Now I want to run in with spark (I
can get these files via wget, for example)





So, how I can run via spark-submit one-filer scala program?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Run-scala-code-with-spark-submit-tp24367.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Run scala code with spark submit

2015-08-20 Thread Dean Wampler
I haven't tried it, but scala-shell should work if you give it a scala
script file, since it's basically a wrapper around the Scala REPL.

dean

On Thursday, August 20, 2015, MasterSergius master.serg...@gmail.com
wrote:

 Is there any possibility to run standalone scala program via spark submit?
 Or
 I have always put it in some packages, build it with maven (or sbt)?

 What if I have just simple program, like that example word counter?
 Could anyone please, show it on this simple test file Greeting.scala:



 It comiles with scalac, runs with scala. Now I want to run in with spark (I
 can get these files via wget, for example)





 So, how I can run via spark-submit one-filer scala program?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Run-scala-code-with-spark-submit-tp24367.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org javascript:;
 For additional commands, e-mail: user-h...@spark.apache.org javascript:;



-- 
Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
Typesafe http://typesafe.com
@deanwampler http://twitter.com/deanwampler
http://polyglotprogramming.com


Re: insert overwrite table phonesall in spark-sql resulted in java.io.StreamCorruptedException

2015-08-20 Thread John Jay
The answer is that my table was not serialized by kyro,but I started
spark-sql shell with kyro,so the data could not be deserialized。



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/insert-overwrite-table-phonesall-in-spark-sql-resulted-in-java-io-StreamCorruptedException-tp23579p24354.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: persist for DStream

2015-08-20 Thread Hemant Bhanawat
Are you asking for something more than this?

http://spark.apache.org/docs/latest/streaming-programming-guide.html#caching--persistence



On Thu, Aug 20, 2015 at 2:09 PM, Deepesh Maheshwari 
deepesh.maheshwar...@gmail.com wrote:

 Hi,

 there are function available tp cache() or persist() RDD in memory but i
 am reading data from kafka in form of DStream and applying operation it and
 i want to persist that DStream in memory for further.

 Please suggest method how i can persist DStream in memory.

 Regards,
 Deepesh



Transformation not happening for reduceByKey or GroupByKey

2015-08-20 Thread satish chandra j
HI All,
I have data in RDD as mentioned below:

RDD : Array[(Int),(Int)] = Array((0,1), (0,2),(1,20),(1,30),(2,40))


I am expecting output as Array((0,3),(1,50),(2,40)) just a sum function on
Values for each key

Code:
RDD.reduceByKey((x,y) = x+y)
RDD.take(3)

Result in console:
RDD: org.apache.spark.rdd.RDD[(Int,Int)]= ShuffledRDD[1] at reduceByKey at
console:73
res:Array[(Int,Int)] = Array()

Command as mentioned

dse spark --master local --jars postgresql-9.4-1201.jar -i  ScriptFile


Please let me know what is missing in my code, as my resultant Array is
empty



Regards,
Satish


How to get the radius of clusters in spark K means

2015-08-20 Thread ashensw
We can get cluster centers in K means clustering. Like wise is there any
method in spark to get the cluster radius?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-get-the-radius-of-clusters-in-spark-K-means-tp24353.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



SparkSQL concerning materials

2015-08-20 Thread Dawid Wysakowicz
Hi,

I would like to dip into SparkSQL. Get to know better the architecture,
good practices, some internals. Could you advise me some materials on this
matter?

Regards
Dawid


persist for DStream

2015-08-20 Thread Deepesh Maheshwari
Hi,

there are function available tp cache() or persist() RDD in memory but i am
reading data from kafka in form of DStream and applying operation it and i
want to persist that DStream in memory for further.

Please suggest method how i can persist DStream in memory.

Regards,
Deepesh


Re: spark kafka partitioning

2015-08-20 Thread ayan guha
If you have 1 topic, that means you have 1 DStream, which will have a
series of RDDs for each batch interval. In receiver-based integration,
there is no direct relationship b/w Kafka paritions with spark partitions.
in Direct approach, 1 partition will be created for each kafka partition.

On Fri, Aug 21, 2015 at 12:48 PM, Gaurav Agarwal gaurav130...@gmail.com
wrote:

 Hello

 Regarding Spark Streaming and Kafka Partitioning

 When i send message on kafka topic with 3 partitions and listens on
 kafkareceiver with local value[4] . how will i come to know in Spark
 Streaming that different Dstreams are created according to partitions of
 kafka messages .

 Thanks




-- 
Best Regards,
Ayan Guha


Spark-Cassandra-connector

2015-08-20 Thread Samya
Hi All,

I need to write an RDD to Cassandra  using the sparkCassandraConnector from
DataStax. My application is using Yarn.

*Some basic Questions :*
1.  Will a call to saveToCassandra(.), be using the same connection
object between all task in a given executor? I mean is there 1 (one)
connection object per executor, that is shared between tasks ?
2. If the above answer is YES, is there a way to create a connectionPool for
each executor, so that multiple task can dump data to cassandra in parallel?

Regards,
Samya



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Cassandra-connector-tp24378.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark 1.3. Insert into hive parquet partitioned table from DataFrame

2015-08-20 Thread Masf
Hi.

I have a dataframe and I want to insert these data into parquet partitioned
table in Hive.

In Spark 1.4 I can use
df.write.partitionBy(x,y).format(parquet).mode(append).saveAsTable(tbl_parquet)

but in Spark 1.3 I can't. How can I do it?

Thanks

-- 
Regards
Miguel


Re: How to overwrite partition when writing Parquet?

2015-08-20 Thread Romi Kuntsman
Cheng - what if I want to overwrite a specific partition?

I'll to remove the folder, as Hemant suggested...

On Thu, Aug 20, 2015 at 1:17 PM Cheng Lian lian.cs@gmail.com wrote:

 You can apply a filter first to filter out data of needed dates and then
 append them.


 Cheng


 On 8/20/15 4:59 PM, Hemant Bhanawat wrote:

 How can I overwrite only a given partition or manually remove a partition
 before writing?

 I don't know if (and I don't think)  there is a way to do that using a
 mode. But doesn't manually deleting the directory of a particular partition
 help? For directory structure, check this out...


 http://spark.apache.org/docs/latest/sql-programming-guide.html#partition-discovery


 On Wed, Aug 19, 2015 at 8:18 PM, Romi Kuntsman r...@totango.com wrote:

 Hello,

 I have a DataFrame, with a date column which I want to use as a partition.
 Each day I want to write the data for the same date in Parquet, and then
 read a dataframe for a date range.

 I'm using:

 myDataframe.write().partitionBy(date).mode(SaveMode.Overwrite).parquet(parquetDir);

 If I use SaveMode.Append, then writing data for the same partition adds
 the same data there again.
 If I use SaveMode.Overwrite, then writing data for a single partition
 removes all the data for all partitions.

 How can I overwrite only a given partition or manually remove a partition
 before writing?

 Many thanks!
 Romi K.






Re: How to add a new column with date duration from 2 date columns in a dataframe

2015-08-20 Thread Dhaval Patel
Apologies, sent too early accidentally. Actual message is below


A dataframe has 2 datecolumns (datetime type) and I would like to add
another column that would have difference between these two dates.
Dataframe snippet is below.

new_df.show(5)
+---+--+--+
| PATID| SVCDATE|next_diag_date|
+---+--+--+
|12345655545|2012-02-13| 2012-02-13|
|12345655545|2012-02-13| 2012-02-13|
|12345655545|2012-02-13| 2012-02-27| +---+--+--+



Here is what I have tried so far:

- new_df.withColumn('SVCDATE2',
(new_df.next_diag_date-new_df.SVCDATE)).show()
Error: DateType does not support numeric operations

- new_df.withColumn('SVCDATE2',
(new_df.next_diag_date-new_df.SVCDATE).days).show()
Error: Can't extract value from (next_diag_date#927 - SVCDATE#377);


However this simple python code works fine with pySpark:

from datetime import date
d0 = date(2008, 8, 18)
d1 = date(2008, 9, 26)
delta = d0 - d1
print (d0 - d1).days

# -39


Any suggestions would be appreciated! Also is there a way to add a new
column in dataframe without using column expression (e.g. like in pandas or
R. df$new_col = 'new col value')?


Thanks,
Dhaval



On Thu, Aug 20, 2015 at 8:18 AM, Dhaval Patel dhaval1...@gmail.com wrote:

 new_df.withColumn('SVCDATE2',
 (new_df.next_diag_date-new_df.SVCDATE).days).show()

 +---+--+--+ | PATID| SVCDATE|next_diag_date|
 +---+--+--+ |12345655545|2012-02-13|
 2012-02-13| |12345655545|2012-02-13| 2012-02-13| |12345655545|2012-02-13|
 2012-02-27| +---+--+--+



Re: How to overwrite partition when writing Parquet?

2015-08-20 Thread Cheng Lian
You can apply a filter first to filter out data of needed dates and then 
append them.


Cheng

On 8/20/15 4:59 PM, Hemant Bhanawat wrote:
How can I overwrite only a given partition or manually remove a 
partition before writing?


I don't know if (and I don't think)  there is a way to do that using a 
mode. But doesn't manually deleting the directory of a particular 
partition help? For directory structure, check this out...


http://spark.apache.org/docs/latest/sql-programming-guide.html#partition-discovery


On Wed, Aug 19, 2015 at 8:18 PM, Romi Kuntsman r...@totango.com 
mailto:r...@totango.com wrote:


Hello,

I have a DataFrame, with a date column which I want to use as a
partition.
Each day I want to write the data for the same date in Parquet,
and then read a dataframe for a date range.

I'm using:

myDataframe.write().partitionBy(date).mode(SaveMode.Overwrite).parquet(parquetDir);

If I use SaveMode.Append, then writing data for the same partition
adds the same data there again.
If I use SaveMode.Overwrite, then writing data for a single
partition removes all the data for all partitions.

How can I overwrite only a given partition or manually remove a
partition before writing?

Many thanks!
Romi K.






[SparkR] How to perform a for loop on a DataFrame object

2015-08-20 Thread Florian M
Hi guys, 

First of all, thank you for your amazing work.

As you can see in the subject, I post here because I need to perform a for
loop on a DataFrame object. 

Sample of my Dataset (the entire dataset is ~400k lines long) : 

I use the 1.4.1 Spark version with R in 3.2.1

I launch sparkR using (the package can be found at
http://spark-packages.org/package/databricks/spark-csv )



I load my dataset from HDFS using the following command (the package is
needed to load a CSV in a Spark DataFrame): 



When I do a summary, the output is : 


What I need to do is to calculate :


But you probably know that we can't do this because the read.df function
return an S4 object and it is not an iterable object.

Does anyone know how can I do that ? 
Maybe I have to convert the type of the DataFrame or use another function to
load my dataset...
I have to say that I'm new to Spark and SparkR :)

Thanks for your time,

Florian




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/SparkR-How-to-perform-a-for-loop-on-a-DataFrame-object-tp24359.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Convert mllib.linalg.Matrix to Breeze

2015-08-20 Thread Naveen

Hi All,

Is there anyway to convert a mllib matrix to a Dense Matrix of Breeze? 
Any leads are appreciated.



Thanks,
Naveen

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Convert mllib.linalg.Matrix to Breeze

2015-08-20 Thread Yanbo Liang
You can use Matrix.toBreeze()
https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala#L56
 .

2015-08-20 18:24 GMT+08:00 Naveen nav...@formcept.com:

 Hi All,

 Is there anyway to convert a mllib matrix to a Dense Matrix of Breeze? Any
 leads are appreciated.


 Thanks,
 Naveen

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




DataFrameWriter.jdbc is very slow

2015-08-20 Thread Aram Mkrtchyan
We want to migrate our data (approximately 20M rows) from parquet to postgres,
when we are using dataframe writer's jdbc method the execution time is very
large,  we have tried the same with batch insert it was much effective.
Is it intentionally implemented in that way?


How to add a new column with date duration from 2 date columns in a dataframe

2015-08-20 Thread Dhaval Patel
new_df.withColumn('SVCDATE2',
(new_df.next_diag_date-new_df.SVCDATE).days).show()

+---+--+--+ | PATID| SVCDATE|next_diag_date|
+---+--+--+ |12345655545|2012-02-13|
2012-02-13| |12345655545|2012-02-13| 2012-02-13| |12345655545|2012-02-13|
2012-02-27| +---+--+--+


Data locality with HDFS not being seen

2015-08-20 Thread Sunil
Hello .  I am seeing some unexpected issues with achieving HDFS data
locality. I expect the tasks to be executed only on the node which has the
data but this is not happening (ofcourse, unless the node is busy in which
case, I understand tasks can go to some other node). Could anyone clarify
whats wrong with the way I am trying or what I should rather do? Below is
the cluster configuration and experiments that I have tried. Any help will
be appreciated. If you would like to recreate the below scenario, then you
may use the JavaWordCount.java example given within the spark.

*Cluster configuration:*

1. spark-1.4.0 and hadoop-2.7.1
2. Machines -- Master node (master) and 6 worker nodes (node1 to node6) 
3. master acts as -- spark master, HDFS name node  sec name node, Yarn
resource manager
4. Each of the 6 worker nodes act as -- spark worker node, HDFS data node,
node manager

*Data on HDFS:*

20Mb text file is stored in single block. With the replication factor of 3,
the text file is stored on nodes 2, 3  4.

*Test-1 (Spark stand alone mode):*

Application being run is the standard Java word count count example with the
above text file in HDFS, as input. On job submission, I see in the spark
web-UI that, stage-0(i.e mapToPair) is being run on random nodes (i.e.
node1, node 2, node 6, etc.). By random I mean that, stage 0 executes on the
very first worker node that gets registered to the application (this can be
looked from the event timeline graph). Rather, I am expecting the stage-0 to
be run only on any one of the three nodes 2, 3, or 4. 

* Test-2 (Yarn cluster mode): *
Same as above. No data locality seen. 

* Additional info: *
No other spark applications are running and I have even tried by setting the
/spark.locality.wait/ to 10s, but still no difference.

Thanks and regards,
Sunil



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Data-locality-with-HDFS-not-being-seen-tp24361.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Any suggestion about sendMessageReliably failed because ack was not received within 120 sec

2015-08-20 Thread java8964
The closed information I can found online related to this error 
ishttps://issues.apache.org/jira/browse/SPARK-3633
But it is quite different in our case. In our case, we never saw the (Too many 
open files) error, the log just simple show the 120 sec time out.
I checked all the GC output from all 42 executors, the max full gc real=11.79 
secs is what I can find, way less than 120 seconds time out.
From 42 executors, there is on executor's stdout/stderr page hangs, I cannot 
see any gc or log information for this executor, but it is shown as LOADING 
in the master page, and I think the reason is just the WorkerUI cannot bind 
to 8081 somehow during the boot time, and bind to 8082 instead, master UI 
didn't catch that information.
Anyway, my only option now is to increase the timeout of both 
spark.core.connection.ack.wait.timeout and spark.akka.timeout to 600, as 
suggested in the jira, and will report back what I find later.
This same daily job runs about 12 hours in the Hive/MR, and can finish about 4 
hours in Spark (with 25% allocated cluster resource). On this point, Spark is 
faster and great, but IF (big IF) every tasks run smoothly.
In Hive/MR, if the job is setup, it will finish, maybe slow, but smoothly. In 
Spark, in this case, it does retry the failed partitions only, but we saw 4 or 
5 times retry sometimes, make it in fact much much slower.
Yong
From: java8...@hotmail.com
To: user@spark.apache.org
Subject: Any suggestion about sendMessageReliably failed because ack was not 
received within 120 sec
Date: Thu, 20 Aug 2015 20:49:52 -0400




Hi, Sparkers:
After first 2 weeks of Spark in our production cluster, with more familiar with 
Spark, we are more confident to avoid Lost Executor due to memory issue. So 
far, most of our jobs won't fail or slow down due to Lost executor.
But sometimes, I observed that individual tasks failed due to 
sendMessageReliably failed because ack was not received within 120 sec. 
Here is the basic information:
Spark 1.3.1 in 1 master + 42 worker boxes in standalone deploymentThe cluster 
also runs Hadoop + MapReduce, so we allocate about 25% resource to Spark. We 
are conservative for the Spark jobs, with low number of cores  + big 
parallelism/partitions to control the memory usage in the job, so far we are 
happen to avoid lost executor.
We have one big daily job is running with following configuration:
/opt/spark/bin/spark-shell --jars spark-avro.jar --conf spark.ui.port=4042 
--executor-memory 20G --total-executor-cores 168 --conf 
spark.storage.memoryFraction=0.1 --conf spark.sql.shuffle.partitions=6000 
--conf spark.default.parallelism=6000 --conf 
spark.shuffle.blockTransferService=nio -i spark.script
168 cores will make each executor run with 4 thread (168 / 42 = 4)There is no 
cache needed, so I make the storage memoryFraction very lownio is much robust 
than netty in our experience
For this big daily job generating over 2 of tasks, they all could finish 
without this issue, but sometimes, for the same job, tasks keep failing due to 
this error and retry.
But even in this case, I saw the task failed due to this error and retry. Retry 
maybe part of life for distribute environment, but I want to know what root 
cause could behind it and how to avoid it.
Do I increase spark.core.connection.ack.wait.timeout to fix this error? When 
this happened, I saw there is no executor lost, all are alive. 
Below is the message in the log, for example, it complained about timeout to 
connect to host-121.
FetchFailed(BlockManagerId(31, host-121, 38930), shuffleId=3, mapId=17, 
reduceId=2577, message=org.apache.spark.shuffle.FetchFailedException: 
sendMessageReliably failed because ack was not received within 120 sec  at 
org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67)
  at 
org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)
  at 
org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)
  at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)  at 
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)  
at 
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)  
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)  at 
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)  at 
scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)  at 
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)  at 
org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1$$anonfun$7.apply(Aggregate.scala:154)
  at 
org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1$$anonfun$7.apply(Aggregate.scala:149)
  at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:634)  at 
org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:634)  at 

RE: SparkR csv without headers

2015-08-20 Thread Sun, Rui
Hi,

You can create a DataFrame using load.df() with a specified schema.

Something like:
schema - structType(structField(“a”, “string”), structField(“b”, integer), …)
read.df ( …, schema = schema)

From: Franc Carter [mailto:franc.car...@rozettatech.com]
Sent: Wednesday, August 19, 2015 1:48 PM
To: user@spark.apache.org
Subject: SparkR csv without headers


Hi,

Does anyone have an example of how to create a DataFrame in SparkR  which 
specifies the column names - the csv files I have do not have column names in 
the first row. I can get read a csv nicely with 
com.databricks:spark-csv_2.10:1.0.3, but I end up with column names C1, C2, C3 
etc


thanks

--

Franc Carter I  Systems ArchitectI RoZetta Technology



[Description: Description: Description: cid:image003.jpg@01D02903.9B540580]



L4. 55 Harrington Street,  THE ROCKS,  NSW, 2000

PO Box H58, Australia Square, Sydney NSW, 1215, AUSTRALIA

T  +61 2 8355 2515tel:%2B61%202%208355%202515 I
www.rozettatechnology.comhttp://www.rozettatechnology.com/

[cid:image002.jpg@01D02903.0B41B280]

DISCLAIMER: The contents of this email, inclusive of attachments, may be legally

privileged and confidential. Any unauthorised use of the contents is expressly 
prohibited.




Any suggestion about sendMessageReliably failed because ack was not received within 120 sec

2015-08-20 Thread java8964
Hi, Sparkers:
After first 2 weeks of Spark in our production cluster, with more familiar with 
Spark, we are more confident to avoid Lost Executor due to memory issue. So 
far, most of our jobs won't fail or slow down due to Lost executor.
But sometimes, I observed that individual tasks failed due to 
sendMessageReliably failed because ack was not received within 120 sec. 
Here is the basic information:
Spark 1.3.1 in 1 master + 42 worker boxes in standalone deploymentThe cluster 
also runs Hadoop + MapReduce, so we allocate about 25% resource to Spark. We 
are conservative for the Spark jobs, with low number of cores  + big 
parallelism/partitions to control the memory usage in the job, so far we are 
happen to avoid lost executor.
We have one big daily job is running with following configuration:
/opt/spark/bin/spark-shell --jars spark-avro.jar --conf spark.ui.port=4042 
--executor-memory 20G --total-executor-cores 168 --conf 
spark.storage.memoryFraction=0.1 --conf spark.sql.shuffle.partitions=6000 
--conf spark.default.parallelism=6000 --conf 
spark.shuffle.blockTransferService=nio -i spark.script
168 cores will make each executor run with 4 thread (168 / 42 = 4)There is no 
cache needed, so I make the storage memoryFraction very lownio is much robust 
than netty in our experience
For this big daily job generating over 2 of tasks, they all could finish 
without this issue, but sometimes, for the same job, tasks keep failing due to 
this error and retry.
But even in this case, I saw the task failed due to this error and retry. Retry 
maybe part of life for distribute environment, but I want to know what root 
cause could behind it and how to avoid it.
Do I increase spark.core.connection.ack.wait.timeout to fix this error? When 
this happened, I saw there is no executor lost, all are alive. 
Below is the message in the log, for example, it complained about timeout to 
connect to host-121.
FetchFailed(BlockManagerId(31, host-121, 38930), shuffleId=3, mapId=17, 
reduceId=2577, message=org.apache.spark.shuffle.FetchFailedException: 
sendMessageReliably failed because ack was not received within 120 sec  at 
org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67)
  at 
org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)
  at 
org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)
  at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)  at 
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)  
at 
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)  
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)  at 
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)  at 
scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)  at 
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)  at 
org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1$$anonfun$7.apply(Aggregate.scala:154)
  at 
org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1$$anonfun$7.apply(Aggregate.scala:149)
  at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:634)  at 
org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:634)  at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)  at 
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)  at 
org.apache.spark.rdd.RDD.iterator(RDD.scala:244)  at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)  at 
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)  at 
org.apache.spark.rdd.RDD.iterator(RDD.scala:244)  at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)  at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)  at 
org.apache.spark.scheduler.Task.run(Task.scala:64)  at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)  at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
 at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
 at java.lang.Thread.run(Thread.java:745)Caused by: java.io.IOException: 
sendMessageReliably failed because ack was not received within 120 sec  at 
org.apache.spark.network.nio.ConnectionManager$$anon$13$$anonfun$run$19.apply(ConnectionManager.scala:929)
  at 
org.apache.spark.network.nio.ConnectionManager$$anon$13$$anonfun$run$19.apply(ConnectionManager.scala:928)
  at scala.Option.foreach(Option.scala:236)  at 
org.apache.spark.network.nio.ConnectionManager$$anon$13.run(ConnectionManager.scala:928)
  at 
io.netty.util.HashedWheelTimer$HashedWheelTimeout.expire(HashedWheelTimer.java:581)
  at 
io.netty.util.HashedWheelTimer$HashedWheelBucket.expireTimeouts(HashedWheelTimer.java:656)
  at 

Re: Kafka Spark Partition Mapping

2015-08-20 Thread Cody Koeninger
In general you cannot guarantee which node an RDD will be processed on.

The preferred location for a kafkardd is the kafka leader for that
partition, if they're deployed on the same machine. If you want to try to
override that behavior, the method is getPreferredLocations

But even in that case, location preferences are just a scheduler hint, the
rdd can still be scheduled elsewhere.  You can turn up spark.locality.wait
to a very high value to decrease the likelihood.



On Thu, Aug 20, 2015 at 5:47 PM, nehalsyed nehal_s...@cable.comcast.com
wrote:

 I have data in Kafka topic-partition and I am reading it from Spark like
 this: JavaPairReceiverInputDStreamString, String directKafkaStream =
 KafkaUtils.createDirectStream(streamingContext, [key class], [value class],
 [key decoder class], [value decoder class], [map of Kafka parameters], [set
 of topics to consume]); I want that message from a kafka partition always
 land on same machine on Spark rdd so I can cache some decoration data
 locally and later reuse with other messages (that belong to same key). Can
 anyone tell me how can I achieve it? Thanks
 --
 View this message in context: Kafka Spark Partition Mapping
 http://apache-spark-user-list.1001560.n3.nabble.com/Kafka-Spark-Partition-Mapping-tp24372.html
 Sent from the Apache Spark User List mailing list archive
 http://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.



Re: what determine the task size?

2015-08-20 Thread ambujhbti
cwz wrote
 sorry, my question is not clear.
 I mean what determine the one task size? not how many tasks

one task size= one HDFS block size.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/what-determine-the-task-size-tp24363p24375.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



spark kafka partitioning

2015-08-20 Thread Gaurav Agarwal
Hello

Regarding Spark Streaming and Kafka Partitioning

When i send message on kafka topic with 3 partitions and listens on
kafkareceiver with local value[4] . how will i come to know in Spark
Streaming that different Dstreams are created according to partitions of
kafka messages .

Thanks


Re: How to list all dataframes and RDDs available in current session?

2015-08-20 Thread Rishitesh Mishra
I am not sure if you can view all RDDs in a session. Tables are maintained
in a catalogue . Hence its easier. However  you can see the DAG
representation , which lists all the RDDs in a job , with Spark UI.
On 20 Aug 2015 22:34, Dhaval Patel dhaval1...@gmail.com wrote:

 Apologies

 I accidentally included Spark User DL on BCC. The actual email message is
 below.
 =


 Hi:

 I have been working on few example using zeppelin.

 I have been trying to find a command that would list all *dataframes/RDDs*
 that has been created in current session. Anyone knows if there is any such
 commands available?

 Something similar to SparkSQL to list all temp tables :
   show tables;

 Thanks,
 Dhaval



 On Thu, Aug 20, 2015 at 12:49 PM, Dhaval Patel dhaval1...@gmail.com
 wrote:

 Hi:

 I have been working on few example using zeppelin.

 I have been trying to find a command that would list all
 *dataframes/RDDs* that has been created in current session. Anyone knows if
 there is any such commands available?

 Something similar to SparkSQL to list all temp tables :
   show tables;

 Thanks,
 Dhaval





Re: Windowed stream operations -- These are too lazy for some use cases

2015-08-20 Thread Iulian Dragoș
On Thu, Aug 20, 2015 at 6:58 PM, Justin Grimes jgri...@adzerk.com wrote:

We are aggregating real time logs of events, and want to do windows of 30
 minutes. However, since the computation doesn't start until 30 minutes have
 passed, there is a ton of data built up that processing could've already
 started on. When it comes time to actually process the data, there is too
 much for our cluster to handle at once.

 The basic idea is this:

  val mergedMain = mergedStream
   .flatMap(r = ) // denormalize data for this particular output
 stream
   .reduceByKey((x:Array[Int],y:Array[Int]) = sumAggregates(x,y)) //
 this would sum over the batches

Could you add a dummy action at this point?

val firstStep = mergedStream
  .flatMap(r = ) // denormalize data for this particular output stream
  .reduceByKey((x:Array[Int],y:Array[Int]) = sumAggregates(x,y))
// this would sum over the batches
  .persist() // this will be reused in windowing operations

firstStep.count() // just to trigger computation

firstStep
  .reduceByKeyAndWindow((x:Array[Int],y:Array[Int]) =
sumAggregates(x,y), 180, 180) // sum over the windows
  .map(rec = ...) // convert data to other format
  .foreachRDD{ (rdd, time) =
rdd.saveAsTextFile(...) // save to text files
  }

  .reduceByKeyAndWindow((x:Array[Int],y:Array[Int]) =
 sumAggregates(x,y), 180, 180) // sum over the windows
   .map(rec = ...) // convert data to other format
   .foreachRDD{ (rdd, time) =
 rdd.saveAsTextFile(...) // save to text files
   }

 I would want the batches to be reduced as soon as they arrive (the first
 reduceByKey), since there isn't any reason to wait. Instead all of the
 unprocessed data has to be processed at the same time (this data is being
 heavily denormalized in some cases, and so generates a bunch of additional
 data).

 Thanks for any help.

​
-- 

--
Iulian Dragos

--
Reactive Apps on the JVM
www.typesafe.com


Re: DAG related query

2015-08-20 Thread Andrew Or
Hi Bahubali,

Once RDDs are created, they are immutable (in most cases). In your case you
end up with 3 RDDs:

(1) the original rdd1 that reads from the text file
(2) rdd2, that applies a map function on (1), and
(3) the new rdd1 that applies a map function on (2)

There's no cycle because you have 3 distinct RDDs. All you're doing is
reassigning a reference `rdd1`, but the underlying RDD doesn't change.

-Andrew

2015-08-20 6:21 GMT-07:00 Sean Owen so...@cloudera.com:

 No. The third line creates a third RDD whose reference simply replaces
 the reference to the first RDD in your local driver program. The first
 RDD still exists.

 On Thu, Aug 20, 2015 at 2:15 PM, Bahubali Jain bahub...@gmail.com wrote:
  Hi,
  How would the DAG look like for the below code
 
  JavaRDDString rdd1 = context.textFile(SOMEPATH);
  JavaRDDString rdd2 = rdd1.map(DO something);
  rdd1 =  rdd2.map(Do SOMETHING);
 
  Does this lead to any kind of cycle?
 
  Thanks,
  Baahu

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Windowed stream operations -- These are too lazy for some use cases

2015-08-20 Thread Justin Grimes
I tried something like that. When I tried just doing count() on the
DStream, it didn't seem like it was actually forcing the computation.

What (sort of) worked was doing a forEachRDD((rdd) = rdd.count()), or
doing a print() on the DStream. The only problem was this seemed to add a
lot of processing overhead -- I couldn't figure out exactly why but it
seemed to have something to do with forEachRDD only being executed on the
driver.

On Thu, Aug 20, 2015 at 1:39 PM, Iulian Dragoș iulian.dra...@typesafe.com
wrote:

 On Thu, Aug 20, 2015 at 6:58 PM, Justin Grimes jgri...@adzerk.com wrote:

 We are aggregating real time logs of events, and want to do windows of 30
 minutes. However, since the computation doesn't start until 30 minutes have
 passed, there is a ton of data built up that processing could've already
 started on. When it comes time to actually process the data, there is too
 much for our cluster to handle at once.

 The basic idea is this:

  val mergedMain = mergedStream
   .flatMap(r = ) // denormalize data for this particular output
 stream
   .reduceByKey((x:Array[Int],y:Array[Int]) = sumAggregates(x,y)) //
 this would sum over the batches

 Could you add a dummy action at this point?

 val firstStep = mergedStream
   .flatMap(r = ) // denormalize data for this particular output 
 stream
   .reduceByKey((x:Array[Int],y:Array[Int]) = sumAggregates(x,y)) // this 
 would sum over the batches
   .persist() // this will be reused in windowing operations

 firstStep.count() // just to trigger computation

 firstStep
   .reduceByKeyAndWindow((x:Array[Int],y:Array[Int]) = 
 sumAggregates(x,y), 180, 180) // sum over the windows
   .map(rec = ...) // convert data to other format
   .foreachRDD{ (rdd, time) =
 rdd.saveAsTextFile(...) // save to text files
   }

   .reduceByKeyAndWindow((x:Array[Int],y:Array[Int]) =
 sumAggregates(x,y), 180, 180) // sum over the windows
   .map(rec = ...) // convert data to other format
   .foreachRDD{ (rdd, time) =
 rdd.saveAsTextFile(...) // save to text files
   }

 I would want the batches to be reduced as soon as they arrive (the first
 reduceByKey), since there isn't any reason to wait. Instead all of the
 unprocessed data has to be processed at the same time (this data is being
 heavily denormalized in some cases, and so generates a bunch of additional
 data).

 Thanks for any help.

 ​
 --

 --
 Iulian Dragos

 --
 Reactive Apps on the JVM
 www.typesafe.com




Re: How to add a new column with date duration from 2 date columns in a dataframe

2015-08-20 Thread Davies Liu
As Aram said, there two options in Spark 1.4,

1) Use the HiveContext, then you got datediff from Hive,
df.selectExpr(datediff(d2, d1))
2) Use Python UDF:
```
 from datetime import date
 df = sqlContext.createDataFrame([(date(2008, 8, 18), date(2008, 9, 26))], 
 ['d1', 'd2'])
 from pyspark.sql.functions import udf
 from pyspark.sql.types import IntegerType
 diff = udf(lambda a, b: (a - b).days, IntegerType())
 df.select(diff(df.d1, df.d2)).show()
+-+
|PythonUDF#lambda(d1,d2)|
+-+
|  -39|
+-+
```

On Thu, Aug 20, 2015 at 7:45 AM, Aram Mkrtchyan
aram.mkrtchyan...@gmail.com wrote:
 Hi,

 hope this will help you

 import org.apache.spark.sql.functions._
 import sqlContext.implicits._
 import java.sql.Timestamp

 val df = sc.parallelize(Array((date1, date2))).toDF(day1, day2)

 val dateDiff = udf[Long, Timestamp, Timestamp]((value1, value2) =
   Days.daysBetween(new DateTime(value2.getTime), new
 DateTime(value1.getTime)).getDays)
 df.withColumn(diff, dateDiff(df(day2), df(day1))).show()

 or you can write sql query using hiveql's datediff function.
  https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF

 On Thu, Aug 20, 2015 at 4:57 PM, Dhaval Patel dhaval1...@gmail.com wrote:

 More update on this question..I am using spark 1.4.1.

 I was just reading documentation of spark 1.5 (still in development) and I
 think there will be a new func *datediff* that will solve the issue. So
 please let me know if there is any work-around until spark 1.5 is out :).

 pyspark.sql.functions.datediff(end, start)[source]

 Returns the number of days from start to end.

  df = sqlContext.createDataFrame([('2015-04-08','2015-05-10')], ['d1',
  'd2'])
  df.select(datediff(df.d2, df.d1).alias('diff')).collect()
 [Row(diff=32)]

 New in version 1.5.


 On Thu, Aug 20, 2015 at 8:26 AM, Dhaval Patel dhaval1...@gmail.com
 wrote:

 Apologies, sent too early accidentally. Actual message is below
 

 A dataframe has 2 datecolumns (datetime type) and I would like to add
 another column that would have difference between these two dates. Dataframe
 snippet is below.

 new_df.show(5)
 +---+--+--+
 | PATID| SVCDATE|next_diag_date|
 +---+--+--+
 |12345655545|2012-02-13| 2012-02-13|
 |12345655545|2012-02-13| 2012-02-13|
 |12345655545|2012-02-13| 2012-02-27|
 +---+--+--+



 Here is what I have tried so far:

 - new_df.withColumn('SVCDATE2',
 (new_df.next_diag_date-new_df.SVCDATE)).show()
 Error: DateType does not support numeric operations

 - new_df.withColumn('SVCDATE2',
 (new_df.next_diag_date-new_df.SVCDATE).days).show()
 Error: Can't extract value from (next_diag_date#927 - SVCDATE#377);


 However this simple python code works fine with pySpark:

 from datetime import date
 d0 = date(2008, 8, 18)
 d1 = date(2008, 9, 26)
 delta = d0 - d1
 print (d0 - d1).days

 # -39


 Any suggestions would be appreciated! Also is there a way to add a new
 column in dataframe without using column expression (e.g. like in pandas or
 R. df$new_col = 'new col value')?


 Thanks,
 Dhaval



 On Thu, Aug 20, 2015 at 8:18 AM, Dhaval Patel dhaval1...@gmail.com
 wrote:

 new_df.withColumn('SVCDATE2',
 (new_df.next_diag_date-new_df.SVCDATE).days).show()

 +---+--+--+ | PATID| SVCDATE|next_diag_date|
 +---+--+--+ |12345655545|2012-02-13| 
 2012-02-13|
 |12345655545|2012-02-13| 2012-02-13| |12345655545|2012-02-13| 2012-02-27|
 +---+--+--+





-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: [survey] [spark-ec2] What do you like/dislike about spark-ec2?

2015-08-20 Thread Nicholas Chammas
I'm planning to close the survey to further responses early next week.

If you haven't chimed in yet, the link to the survey is here:

http://goo.gl/forms/erct2s6KRR

We already have some great responses, which you can view. I'll share a
summary after the survey is closed.

Cheers!

Nick


On Mon, Aug 17, 2015 at 11:09 AM Nicholas Chammas 
nicholas.cham...@gmail.com wrote:

 Howdy folks!

 I’m interested in hearing about what people think of spark-ec2
 http://spark.apache.org/docs/latest/ec2-scripts.html outside of the
 formal JIRA process. Your answers will all be anonymous and public.

 If the embedded form below doesn’t work for you, you can use this link to
 get the same survey:

 http://goo.gl/forms/erct2s6KRR

 Cheers!
 Nick
 ​



Re: How to add a new column with date duration from 2 date columns in a dataframe

2015-08-20 Thread Dhaval Patel
More update on this question..I am using spark 1.4.1.

I was just reading documentation of spark 1.5 (still in development) and I
think there will be a new func *datediff* that will solve the issue. So
please let me know if there is any work-around until spark 1.5 is out :).

pyspark.sql.functions.datediff(*end*, *start*)[source]
http://people.apache.org/~pwendell/spark-releases/spark-1.5.0-preview-20150812-docs/api/python/_modules/pyspark/sql/functions.html#datediff
http://people.apache.org/~pwendell/spark-releases/spark-1.5.0-preview-20150812-docs/api/python/pyspark.sql.html#pyspark.sql.functions.datediff

Returns the number of days from start to end.

 df = sqlContext.createDataFrame([('2015-04-08','2015-05-10')], ['d1', 
 'd2']) df.select(datediff(df.d2, 
 df.d1).alias('diff')).collect()[Row(diff=32)]

New in version 1.5.

On Thu, Aug 20, 2015 at 8:26 AM, Dhaval Patel dhaval1...@gmail.com wrote:

 Apologies, sent too early accidentally. Actual message is below
 

 A dataframe has 2 datecolumns (datetime type) and I would like to add
 another column that would have difference between these two dates.
 Dataframe snippet is below.

 new_df.show(5)
 +---+--+--+
 | PATID| SVCDATE|next_diag_date|
 +---+--+--+
 |12345655545|2012-02-13| 2012-02-13|
 |12345655545|2012-02-13| 2012-02-13|
 |12345655545|2012-02-13| 2012-02-27|
 +---+--+--+



 Here is what I have tried so far:

 - new_df.withColumn('SVCDATE2',
 (new_df.next_diag_date-new_df.SVCDATE)).show()
 Error: DateType does not support numeric operations

 - new_df.withColumn('SVCDATE2',
 (new_df.next_diag_date-new_df.SVCDATE).days).show()
 Error: Can't extract value from (next_diag_date#927 - SVCDATE#377);


 However this simple python code works fine with pySpark:

 from datetime import date
 d0 = date(2008, 8, 18)
 d1 = date(2008, 9, 26)
 delta = d0 - d1
 print (d0 - d1).days

 # -39


 Any suggestions would be appreciated! Also is there a way to add a new
 column in dataframe without using column expression (e.g. like in pandas or
 R. df$new_col = 'new col value')?


 Thanks,
 Dhaval



 On Thu, Aug 20, 2015 at 8:18 AM, Dhaval Patel dhaval1...@gmail.com
 wrote:

 new_df.withColumn('SVCDATE2',
 (new_df.next_diag_date-new_df.SVCDATE).days).show()

 +---+--+--+ | PATID| SVCDATE|next_diag_date|
 +---+--+--+ |12345655545|2012-02-13|
 2012-02-13| |12345655545|2012-02-13| 2012-02-13| |12345655545|2012-02-13|
 2012-02-27| +---+--+--+





Re: DAG related query

2015-08-20 Thread Sean Owen
No. The third line creates a third RDD whose reference simply replaces
the reference to the first RDD in your local driver program. The first
RDD still exists.

On Thu, Aug 20, 2015 at 2:15 PM, Bahubali Jain bahub...@gmail.com wrote:
 Hi,
 How would the DAG look like for the below code

 JavaRDDString rdd1 = context.textFile(SOMEPATH);
 JavaRDDString rdd2 = rdd1.map(DO something);
 rdd1 =  rdd2.map(Do SOMETHING);

 Does this lead to any kind of cycle?

 Thanks,
 Baahu

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



PySpark concurrent jobs using single SparkContext

2015-08-20 Thread Mike Sukmanowsky
Hi all,

We're using Spark 1.3.0 via a small YARN cluster to do some log processing.
The jobs are pretty simple, for a number of customers and a number of days,
fetch some event log data, build aggregates and store those aggregates into
a data store.

The way our script is written right now does something akin to:

with SparkContext() as sc:
for customer in customers:
for day in days:
logs = sc.textFile(get_logs(customer, day))
aggregate = make_aggregate(logs)
# This function contains the action saveAsNewAPIHadoopFile which
# triggers a save
save_aggregate(aggregate)

​
So we have a Spark job per customer, per day.

I tried doing some parallel job submission with something similar to:

def make_and_save_aggregate(customer, day, spark_context):
# Without a separate threading.Lock() here or better yet, one guarding the
# Spark context, multiple customer/day transformations and actions could
# be interweaved
sc = spark_context
logs = sc.textFile(get_logs(customer, day))
aggregate = make_aggregate(logs)
save_aggregate(aggregate)
with SparkContext() as sc, futures.ThreadPoolExecutor(4) as executor:
for customer in customers:
for day in days:
executor.submit(make_and_save_aggregate, customer, day, sc)

​
The problem is, with no locks on a SparkContext except during initialization
https://github.com/apache/spark/blob/v1.3.0/python/pyspark/context.py#L214-L241
and
shutdown
https://github.com/apache/spark/blob/v1.3.0/python/pyspark/context.py#L296-L307,
operations on the context could (if I understand correctly) be interweaved
leading to DAG which contains transformations out of order and from
different customer, day periods.

One solution is instead to launch multiple Spark jobs via spark-submit and
let YARN/Spark's dynamic executor allocation take care of fair scheduling.
In practice, this doesn't seem to yield very fast computation perhaps due
to some additional overhead with YARN.

Is there any safe way to launch concurrent jobs like this using a single
PySpark context?

-- 
Mike Sukmanowsky
Aspiring Digital Carpenter

*e*: mike.sukmanow...@gmail.com

LinkedIn http://www.linkedin.com/profile/view?id=10897143 | github
https://github.com/msukmanowsky


Re: How to avoid executor time out on yarn spark while dealing with large shuffle skewed data?

2015-08-20 Thread Sandy Ryza
What version of Spark are you using?  Have you set any shuffle configs?

On Wed, Aug 19, 2015 at 11:46 AM, unk1102 umesh.ka...@gmail.com wrote:

 I have one Spark job which seems to run fine but after one hour or so
 executor start getting lost because of time out something like the
 following
 error

 cluster.yarnScheduler : Removing an executor 14 65 timeout exceeds
 60 seconds

 and because of above error couple of chained errors starts to come like
 FetchFailedException, Rpc client disassociated, Connection reset by peer,
 IOException etc

 Please see the following UI page I have noticed when shuffle read/write
 starts to increase more than 10 GB executors starts getting lost because of
 timeout. How do I clear this stacked memory of 10 GB in shuffle read/write
 section I dont cache anything why Spark is not clearing those memory.
 Please
 guide.

 IMG_20150819_231418358.jpg
 
 http://apache-spark-user-list.1001560.n3.nabble.com/file/n24345/IMG_20150819_231418358.jpg
 




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/How-to-avoid-executor-time-out-on-yarn-spark-while-dealing-with-large-shuffle-skewed-data-tp24345.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: spark.sql.shuffle.partitions=1 seems to be working fine but creates timeout for large skewed data

2015-08-20 Thread Umesh Kacha
Hi Hemant sorry for the confusion I meant final output part files in the
final directory hdfs I never meant intermediate files. Thanks. My goal is
to reduce those many files because of my use case explained in the first
email with calculations.
On Aug 20, 2015 5:59 PM, Hemant Bhanawat hemant9...@gmail.com wrote:

 Sorry, I misread your mail. Thanks for pointing that out.

 BTW, are the 8 files shuffle intermediate output and not the final
 output? I assume yes. I didn't know that you can keep intermediate output
 on HDFS and I don't think that is recommended.




 On Thu, Aug 20, 2015 at 2:43 PM, Hemant Bhanawat hemant9...@gmail.com
 wrote:

 Looks like you are using hash based shuffling and not sort based
 shuffling which creates a single file per maptask.

 On Thu, Aug 20, 2015 at 12:43 AM, unk1102 umesh.ka...@gmail.com wrote:

 Hi I have a Spark job which deals with large skewed dataset. I have
 around
 1000 Hive partitions to process in four different tables every day. So
 if I
 go with 200 spark.sql.shuffle.partitions default partitions created by
 Spark
 I end up with 4 * 1000 * 200 = 8 small small files in HDFS which
 wont be
 good for HDFS name node I have been told if you keep on creating such
 large
 no of small small files namenode will crash is it true? please help me
 understand. Anyways so to avoid creating small files I did set
 spark.sql.shuffle.partitions=1 it seems to be creating 1 output file and
 as
 per my understanding because of only one output there is so much
 shuffling
 to do to bring all data to once reducer please correct me if I am wrong.
 This is causing memory/timeout issues how do I deal with it

 I tried to give spark.shuffle.storage=0.7 also still this memory seems
 not
 enough for it. I have 25 gb executor with 4 cores and 20 such executors
 still Spark job fails please guide.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/spark-sql-shuffle-partitions-1-seems-to-be-working-fine-but-creates-timeout-for-large-skewed-data-tp24346.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org






Re: SparkSQL concerning materials

2015-08-20 Thread Muhammad Atif
Hi Dawid

The best pace to get started is the Spark SQL Guide from Apache
http://spark.apache.org/docs/latest/sql-programming-guide.html

Regards
Muhammad

On Thu, Aug 20, 2015 at 5:46 AM, Dawid Wysakowicz 
wysakowicz.da...@gmail.com wrote:

 Hi,

 I would like to dip into SparkSQL. Get to know better the architecture,
 good practices, some internals. Could you advise me some materials on this
 matter?

 Regards
 Dawid



Re: How to avoid executor time out on yarn spark while dealing with large shuffle skewed data?

2015-08-20 Thread Sandy Ryza
Moving this back onto user@

Regarding GC, can you look in the web UI and see whether the GC time
metric dominates the amount of time spent on each task (or at least the
tasks that aren't completing)?

Also, have you tried bumping your spark.yarn.executor.memoryOverhead?  YARN
may be killing your executors for using too much off-heap space.  You can
see whether this is happening by looking in the Spark AM or YARN
NodeManager logs.

-Sandy

On Thu, Aug 20, 2015 at 7:39 AM, Umesh Kacha umesh.ka...@gmail.com wrote:

 Hi thanks much for the response. Yes I tried default settings too 0.2 it
 was also going into timeout if it is spending time in GC then why it is not
 throwing GC error I don't see any such error. Yarn logs are not helpful at
 all. What is tungsten how do I use it? Spark is doing great I believe my
 job runs successfully and 60% tasks completes only after first executor
 gets lost things are messing.
 On Aug 20, 2015 7:59 PM, Sandy Ryza sandy.r...@cloudera.com wrote:

 What sounds most likely is that you're hitting heavy garbage collection.
 Did you hit issues when the shuffle memory fraction was at its default of
 0.2?  A potential danger with setting the shuffle storage to 0.7 is that it
 allows shuffle objects to get into the GC old generation, which triggers
 more stop-the-world garbage collections.

 Have you tried enabling Tungsten / unsafe?

 Unfortunately, Spark is still not that great at dealing with
 heavily-skewed shuffle data, because its reduce-side aggregation still
 operates on Java objects instead of binary data.

 -Sandy

 On Thu, Aug 20, 2015 at 7:21 AM, Umesh Kacha umesh.ka...@gmail.com
 wrote:

 Hi Sandy thanks much for the response. I am using Spark 1.4.1 and I have
 set spark.shuffle.storage as 0.7 as my spark job involves 4 groupby queries
 executed using hiveContext.sql my data set is skewed so will be more
 shuffling I believe I don't know what's wrong spark job runs fine for
 almost an hour and when shuffle read shuffle write column in UI starts to
 show more than 10 gb executor starts to getting lost because of timeout and
 slowly other executor starts getting lost. Please guide.
 On Aug 20, 2015 7:38 PM, Sandy Ryza sandy.r...@cloudera.com wrote:

 What version of Spark are you using?  Have you set any shuffle configs?

 On Wed, Aug 19, 2015 at 11:46 AM, unk1102 umesh.ka...@gmail.com
 wrote:

 I have one Spark job which seems to run fine but after one hour or so
 executor start getting lost because of time out something like the
 following
 error

 cluster.yarnScheduler : Removing an executor 14 65 timeout exceeds
 60 seconds

 and because of above error couple of chained errors starts to come like
 FetchFailedException, Rpc client disassociated, Connection reset by
 peer,
 IOException etc

 Please see the following UI page I have noticed when shuffle read/write
 starts to increase more than 10 GB executors starts getting lost
 because of
 timeout. How do I clear this stacked memory of 10 GB in shuffle
 read/write
 section I dont cache anything why Spark is not clearing those memory.
 Please
 guide.

 IMG_20150819_231418358.jpg
 
 http://apache-spark-user-list.1001560.n3.nabble.com/file/n24345/IMG_20150819_231418358.jpg
 




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/How-to-avoid-executor-time-out-on-yarn-spark-while-dealing-with-large-shuffle-skewed-data-tp24345.html
 Sent from the Apache Spark User List mailing list archive at
 Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org






Re: Convert mllib.linalg.Matrix to Breeze

2015-08-20 Thread Burak Yavuz
Matrix.toBreeze is a private method. MLlib matrices have the same structure
as Breeze Matrices. Just create a new Breeze matrix like this
https://github.com/apache/spark/blob/43e0135421b2262cbb0e06aae53523f663b4f959/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala#L270
.

Best,
Burak


On Thu, Aug 20, 2015 at 3:28 AM, Yanbo Liang yblia...@gmail.com wrote:

 You can use Matrix.toBreeze()
 https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala#L56
  .

 2015-08-20 18:24 GMT+08:00 Naveen nav...@formcept.com:

 Hi All,

 Is there anyway to convert a mllib matrix to a Dense Matrix of Breeze?
 Any leads are appreciated.


 Thanks,
 Naveen

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Data frame created from hive table and its partition

2015-08-20 Thread VIJAYAKUMAR JAWAHARLAL
Hi 

I have a question regarding data frame partition. I read a hive table from 
spark and following spark api converts it as DF.

test_df = sqlContext.sql(“select * from hivetable1”)

How does spark decide partition of test_df? Is there a way to partition test_df 
based on some column while reading hive table? Second question is, if that hive 
table has primary key declared, does spark honor PK in hive table and partition 
based on PKs?

Thanks
Vijay
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Transformation not happening for reduceByKey or GroupByKey

2015-08-20 Thread satish chandra j
HI All,
Could anybody let me know what is that i missing here, it should work as
its a basic transformation

Please let me know if any additional information required

Regards,
Satish

On Thu, Aug 20, 2015 at 3:35 PM, satish chandra j jsatishchan...@gmail.com
wrote:

 HI All,
 I have data in RDD as mentioned below:

 RDD : Array[(Int),(Int)] = Array((0,1), (0,2),(1,20),(1,30),(2,40))


 I am expecting output as Array((0,3),(1,50),(2,40)) just a sum function on
 Values for each key

 Code:
 RDD.reduceByKey((x,y) = x+y)
 RDD.take(3)

 Result in console:
 RDD: org.apache.spark.rdd.RDD[(Int,Int)]= ShuffledRDD[1] at reduceByKey at
 console:73
 res:Array[(Int,Int)] = Array()

 Command as mentioned

 dse spark --master local --jars postgresql-9.4-1201.jar -i  ScriptFile


 Please let me know what is missing in my code, as my resultant Array is
 empty



 Regards,
 Satish




Re: How to add a new column with date duration from 2 date columns in a dataframe

2015-08-20 Thread Aram Mkrtchyan
Hi,

hope this will help you

import org.apache.spark.sql.functions._
import sqlContext.implicits._
import java.sql.Timestamp

val df = sc.parallelize(Array((date1, date2))).toDF(day1, day2)

val dateDiff = udf[Long, Timestamp, Timestamp]((value1, value2) =
  Days.daysBetween(new DateTime(value2.getTime), new
DateTime(value1.getTime)).getDays)
df.withColumn(diff, dateDiff(df(day2), df(day1))).show()

or you can write sql query using hiveql's datediff function.
 https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF

On Thu, Aug 20, 2015 at 4:57 PM, Dhaval Patel dhaval1...@gmail.com wrote:

 More update on this question..I am using spark 1.4.1.

 I was just reading documentation of spark 1.5 (still in development) and I
 think there will be a new func *datediff* that will solve the issue. So
 please let me know if there is any work-around until spark 1.5 is out :).

 pyspark.sql.functions.datediff(*end*, *start*)[source]
 http://people.apache.org/~pwendell/spark-releases/spark-1.5.0-preview-20150812-docs/api/python/_modules/pyspark/sql/functions.html#datediff
 http://people.apache.org/~pwendell/spark-releases/spark-1.5.0-preview-20150812-docs/api/python/pyspark.sql.html#pyspark.sql.functions.datediff

 Returns the number of days from start to end.

  df = sqlContext.createDataFrame([('2015-04-08','2015-05-10')], ['d1', 
  'd2']) df.select(datediff(df.d2, 
  df.d1).alias('diff')).collect()[Row(diff=32)]

 New in version 1.5.

 On Thu, Aug 20, 2015 at 8:26 AM, Dhaval Patel dhaval1...@gmail.com
 wrote:

 Apologies, sent too early accidentally. Actual message is below
 

 A dataframe has 2 datecolumns (datetime type) and I would like to add
 another column that would have difference between these two dates.
 Dataframe snippet is below.

 new_df.show(5)
 +---+--+--+
 | PATID| SVCDATE|next_diag_date|
 +---+--+--+
 |12345655545|2012-02-13| 2012-02-13|
 |12345655545|2012-02-13| 2012-02-13|
 |12345655545|2012-02-13| 2012-02-27|
 +---+--+--+



 Here is what I have tried so far:

 - new_df.withColumn('SVCDATE2',
 (new_df.next_diag_date-new_df.SVCDATE)).show()
 Error: DateType does not support numeric operations

 - new_df.withColumn('SVCDATE2',
 (new_df.next_diag_date-new_df.SVCDATE).days).show()
 Error: Can't extract value from (next_diag_date#927 - SVCDATE#377);


 However this simple python code works fine with pySpark:

 from datetime import date
 d0 = date(2008, 8, 18)
 d1 = date(2008, 9, 26)
 delta = d0 - d1
 print (d0 - d1).days

 # -39


 Any suggestions would be appreciated! Also is there a way to add a new
 column in dataframe without using column expression (e.g. like in pandas or
 R. df$new_col = 'new col value')?


 Thanks,
 Dhaval



 On Thu, Aug 20, 2015 at 8:18 AM, Dhaval Patel dhaval1...@gmail.com
 wrote:

 new_df.withColumn('SVCDATE2',
 (new_df.next_diag_date-new_df.SVCDATE).days).show()

 +---+--+--+ | PATID| SVCDATE|next_diag_date|
 +---+--+--+ |12345655545|2012-02-13|
 2012-02-13| |12345655545|2012-02-13| 2012-02-13| |12345655545|2012-02-13|
 2012-02-27| +---+--+--+






DAG related query

2015-08-20 Thread Bahubali Jain
Hi,
How would the DAG look like for the below code

JavaRDDString rdd1 = context.textFile(SOMEPATH);
JavaRDDString rdd2 = rdd1.map(DO something);
rdd1 =  rdd2.map(Do SOMETHING);

Does this lead to any kind of cycle?

Thanks,
Baahu


Re: spark.sql.shuffle.partitions=1 seems to be working fine but creates timeout for large skewed data

2015-08-20 Thread Hemant Bhanawat
Sorry, I misread your mail. Thanks for pointing that out.

BTW, are the 8 files shuffle intermediate output and not the final
output? I assume yes. I didn't know that you can keep intermediate output
on HDFS and I don't think that is recommended.




On Thu, Aug 20, 2015 at 2:43 PM, Hemant Bhanawat hemant9...@gmail.com
wrote:

 Looks like you are using hash based shuffling and not sort based shuffling
 which creates a single file per maptask.

 On Thu, Aug 20, 2015 at 12:43 AM, unk1102 umesh.ka...@gmail.com wrote:

 Hi I have a Spark job which deals with large skewed dataset. I have around
 1000 Hive partitions to process in four different tables every day. So if
 I
 go with 200 spark.sql.shuffle.partitions default partitions created by
 Spark
 I end up with 4 * 1000 * 200 = 8 small small files in HDFS which wont
 be
 good for HDFS name node I have been told if you keep on creating such
 large
 no of small small files namenode will crash is it true? please help me
 understand. Anyways so to avoid creating small files I did set
 spark.sql.shuffle.partitions=1 it seems to be creating 1 output file and
 as
 per my understanding because of only one output there is so much shuffling
 to do to bring all data to once reducer please correct me if I am wrong.
 This is causing memory/timeout issues how do I deal with it

 I tried to give spark.shuffle.storage=0.7 also still this memory seems not
 enough for it. I have 25 gb executor with 4 cores and 20 such executors
 still Spark job fails please guide.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/spark-sql-shuffle-partitions-1-seems-to-be-working-fine-but-creates-timeout-for-large-skewed-data-tp24346.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Re: Convert mllib.linalg.Matrix to Breeze

2015-08-20 Thread Naveen

Hi,

Thanks for the reply. I tried Matrix.toBreeze() which returns the 
following error:


*/method toBreeze in trait Matrix cannot be accessed in 
org.apache.spark.mllib.linalg.Matrix/*



On Thursday 20 August 2015 07:50 PM, Burak Yavuz wrote:
Matrix.toBreeze is a private method. MLlib matrices have the same 
structure as Breeze Matrices. Just create a new Breeze matrix like 
this 
https://github.com/apache/spark/blob/43e0135421b2262cbb0e06aae53523f663b4f959/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala#L270. 



Best,
Burak


On Thu, Aug 20, 2015 at 3:28 AM, Yanbo Liang yblia...@gmail.com 
mailto:yblia...@gmail.com wrote:


You can use Matrix.toBreeze()

https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala#L56
 .

2015-08-20 18:24 GMT+08:00 Naveen nav...@formcept.com
mailto:nav...@formcept.com:

Hi All,

Is there anyway to convert a mllib matrix to a Dense Matrix of
Breeze? Any leads are appreciated.


Thanks,
Naveen

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
mailto:user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org
mailto:user-h...@spark.apache.org







Re: SparkR - can't create spark context - JVM not ready

2015-08-20 Thread Deborah Siegel
Thanks Shivaram. You got me wondering about the path so I put it in full
and it worked. R does not, of course, expand a ~.

On Thu, Aug 20, 2015 at 4:35 PM, Shivaram Venkataraman 
shiva...@eecs.berkeley.edu wrote:

 Can you check if the file
 `~/software/spark-1.4.1-bin-hadoop2.4/bin/spark-submit` exists ? The
 error message seems to indicate it is trying to pick up Spark from
 that location and can't seem to find Spark installed there.

 Thanks
 Shivaram

 On Thu, Aug 20, 2015 at 3:30 PM, Deborah Siegel
 deborah.sie...@gmail.com wrote:
  Hello,
 
  I have previously successfully run SparkR in RStudio, with:
 
 Sys.setenv(SPARK_HOME=~/software/spark-1.4.1-bin-hadoop2.4)
 .libPaths(c(file.path(Sys.getenv(SPARK_HOME), R, lib),
 .libPaths()))
 library(SparkR)
 sc - sparkR.init(master=local[2],appName=SparkR-example)
 
 
  Then I tried putting some of it into an .Rprofile. It seemed to work to
 load
  the paths and SparkR, but I got an error when trying to create the sc. I
  then removed my .Rprofile, as well as .rstudio-desktop. However, I still
  cannot create the sc. Here is the error
 
  sc - sparkR.init(master=local[2],appName=SparkR-example)
  Launching java with spark-submit command
  ~/software/spark-1.4.1-bin-hadoop2.4/bin/spark-submit   sparkr-shell
 
 /var/folders/p7/k1bpgmx93yd6pjq7dzf35gk8gn/T//RtmpOitA28/backend_port23377046db
  sh: ~/software/spark-1.4.1-bin-hadoop2.4/bin/spark-submit: No such file
 or
  directory
  Error in sparkR.init(master = local[2], appName = SparkR-example) :
  JVM is not ready after 10 seconds
 
  I suspected there was an incomplete process or something. I checked for
 any
  running R or Java processes and there were none. Has someone seen this
 type
  of error? I have the same error in both RStudio and in R shell (but not
  sparkR wrapper).
 
  Thanks,
  Deb
 
 



Re: spark kafka partitioning

2015-08-20 Thread Cody Koeninger
I'm not clear on your question, can you rephrase it?  Also, are you talking
about createStream or createDirectStream?

On Thu, Aug 20, 2015 at 9:48 PM, Gaurav Agarwal gaurav130...@gmail.com
wrote:

 Hello

 Regarding Spark Streaming and Kafka Partitioning

 When i send message on kafka topic with 3 partitions and listens on
 kafkareceiver with local value[4] . how will i come to know in Spark
 Streaming that different Dstreams are created according to partitions of
 kafka messages .

 Thanks




Re: SparkR csv without headers

2015-08-20 Thread Franc Carter
Thanks - works nicely

cheers

On Fri, Aug 21, 2015 at 12:43 PM, Sun, Rui rui@intel.com wrote:

 Hi,



 You can create a DataFrame using load.df() with a specified schema.



 Something like:

 schema - structType(structField(“a”, “string”), structField(“b”,
 integer), …)

 read.df ( …, schema = schema)



 *From:* Franc Carter [mailto:franc.car...@rozettatech.com]
 *Sent:* Wednesday, August 19, 2015 1:48 PM
 *To:* user@spark.apache.org
 *Subject:* SparkR csv without headers





 Hi,



 Does anyone have an example of how to create a DataFrame in SparkR  which
 specifies the column names - the csv files I have do not have column names
 in the first row. I can get read a csv nicely with
 com.databricks:spark-csv_2.10:1.0.3, but I end up with column names C1, C2,
 C3 etc





 thanks



 --

 *Franc Carter* I  Systems ArchitectI RoZetta Technology



 [image: Description: Description: Description:
 cid:image003.jpg@01D02903.9B540580]



 L4. 55 Harrington Street,  THE ROCKS,  NSW, 2000

 PO Box H58, Australia Square, Sydney NSW, 1215, AUSTRALIA

 *T*  +61 2 8355 2515 Iwww.rozettatechnology.com

 [image: cid:image002.jpg@01D02903.0B41B280]

 DISCLAIMER: The contents of this email, inclusive of attachments, may be
 legally

 privileged and confidential. Any unauthorised use of the contents is
 expressly prohibited.








-- 

*Franc Carter* I  Systems ArchitectI RoZetta Technology



[image: Description: Description: Description:
cid:image003.jpg@01D02903.9B540580]



L4. 55 Harrington Street,  THE ROCKS,  NSW, 2000

PO Box H58, Australia Square, Sydney NSW, 1215, AUSTRALIA

*T*  +61 2 8355 2515 Iwww.rozettatechnology.com

[image: cid:image002.jpg@01D02903.0B41B280]

DISCLAIMER: The contents of this email, inclusive of attachments, may be
legally

privileged and confidential. Any unauthorised use of the contents is
expressly prohibited.


Re: How to avoid executor time out on yarn spark while dealing with large shuffle skewed data?

2015-08-20 Thread Sandy Ryza
GC wouldn't necessarily result in errors - it could just be slowing down
your job and causing the executor JVMs to stall.  If you click on a stage
in the UI, you should end up on a page with all the metrics concerning the
tasks that ran in that stage.  GC Time is one of these task metrics.

-Sandy

On Thu, Aug 20, 2015 at 8:54 AM, Umesh Kacha umesh.ka...@gmail.com wrote:

 Hi where do I see GC time in UI? I have set spark.yarn.executor.memoryOverhead
 as 3500 which seems to be good enough I believe. So you mean only GC could
 be the reason behind timeout I checked Yarn logs I did not see any GC error
 there. Please guide. Thanks much.

 On Thu, Aug 20, 2015 at 8:14 PM, Sandy Ryza sandy.r...@cloudera.com
 wrote:

 Moving this back onto user@

 Regarding GC, can you look in the web UI and see whether the GC time
 metric dominates the amount of time spent on each task (or at least the
 tasks that aren't completing)?

 Also, have you tried bumping your spark.yarn.executor.memoryOverhead?
 YARN may be killing your executors for using too much off-heap space.  You
 can see whether this is happening by looking in the Spark AM or YARN
 NodeManager logs.

 -Sandy

 On Thu, Aug 20, 2015 at 7:39 AM, Umesh Kacha umesh.ka...@gmail.com
 wrote:

 Hi thanks much for the response. Yes I tried default settings too 0.2 it
 was also going into timeout if it is spending time in GC then why it is not
 throwing GC error I don't see any such error. Yarn logs are not helpful at
 all. What is tungsten how do I use it? Spark is doing great I believe my
 job runs successfully and 60% tasks completes only after first executor
 gets lost things are messing.
 On Aug 20, 2015 7:59 PM, Sandy Ryza sandy.r...@cloudera.com wrote:

 What sounds most likely is that you're hitting heavy garbage
 collection.  Did you hit issues when the shuffle memory fraction was at its
 default of 0.2?  A potential danger with setting the shuffle storage to 0.7
 is that it allows shuffle objects to get into the GC old generation, which
 triggers more stop-the-world garbage collections.

 Have you tried enabling Tungsten / unsafe?

 Unfortunately, Spark is still not that great at dealing with
 heavily-skewed shuffle data, because its reduce-side aggregation still
 operates on Java objects instead of binary data.

 -Sandy

 On Thu, Aug 20, 2015 at 7:21 AM, Umesh Kacha umesh.ka...@gmail.com
 wrote:

 Hi Sandy thanks much for the response. I am using Spark 1.4.1 and I
 have set spark.shuffle.storage as 0.7 as my spark job involves 4 groupby
 queries executed using hiveContext.sql my data set is skewed so will be
 more shuffling I believe I don't know what's wrong spark job runs fine for
 almost an hour and when shuffle read shuffle write column in UI starts to
 show more than 10 gb executor starts to getting lost because of timeout 
 and
 slowly other executor starts getting lost. Please guide.
 On Aug 20, 2015 7:38 PM, Sandy Ryza sandy.r...@cloudera.com wrote:

 What version of Spark are you using?  Have you set any shuffle
 configs?

 On Wed, Aug 19, 2015 at 11:46 AM, unk1102 umesh.ka...@gmail.com
 wrote:

 I have one Spark job which seems to run fine but after one hour or so
 executor start getting lost because of time out something like the
 following
 error

 cluster.yarnScheduler : Removing an executor 14 65 timeout
 exceeds
 60 seconds

 and because of above error couple of chained errors starts to come
 like
 FetchFailedException, Rpc client disassociated, Connection reset by
 peer,
 IOException etc

 Please see the following UI page I have noticed when shuffle
 read/write
 starts to increase more than 10 GB executors starts getting lost
 because of
 timeout. How do I clear this stacked memory of 10 GB in shuffle
 read/write
 section I dont cache anything why Spark is not clearing those
 memory. Please
 guide.

 IMG_20150819_231418358.jpg
 
 http://apache-spark-user-list.1001560.n3.nabble.com/file/n24345/IMG_20150819_231418358.jpg
 




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/How-to-avoid-executor-time-out-on-yarn-spark-while-dealing-with-large-shuffle-skewed-data-tp24345.html
 Sent from the Apache Spark User List mailing list archive at
 Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org








Re: Creating Spark DataFrame from large pandas DataFrame

2015-08-20 Thread Burak Yavuz
If you would like to try using spark-csv, please use
`pyspark --packages com.databricks:spark-csv_2.11:1.2.0`

You're missing a dependency.

Best,
Burak

On Thu, Aug 20, 2015 at 1:08 PM, Charlie Hack charles.t.h...@gmail.com
wrote:

 Hi,

 I'm new to spark and am trying to create a Spark df from a pandas df with
 ~5 million rows. Using Spark 1.4.1.

 When I type:

 df = sqlContext.createDataFrame(pandas_df.where(pd.notnull(didf), None))

 (the df.where is a hack I found on the Spark JIRA to avoid a problem with
 NaN values making mixed column types)

 I get:

 TypeError: cannot create an RDD from type: type 'list'

 Converting a smaller pandas dataframe (~2000 rows) works fine. Anyone had
 this issue?


 This is already a workaround-- ideally I'd like to read the spark
 dataframe from a Hive table. But this is currently not an option for my
 setup.

 I also tried reading the data into spark from a CSV using spark-csv.
 Haven't been able to make this work as yet. I launch

 $ pyspark --jars path/to/spark-csv_2.11-1.2.0.jar

 and when I attempt to read the csv I get:

 Py4JJavaError: An error occurred while calling o22.load. :
 java.lang.NoClassDefFoundError: org/apache/commons/csv/CSVFormat ...

 Other options I can think of:

 - Convert my CSV to json (use Pig?) and read into Spark
 - Read in using jdbc connect from postgres

 But want to make sure I'm not misusing Spark or missing something obvious.

 Thanks!

 Charlie



org.apache.hadoop.security.AccessControlException: Permission denied when access S3

2015-08-20 Thread Shuai Zheng
Hi All,

 

I try to access S3 file from S3 in Hadoop file format:

 

Below is my code:

 

 Configuration hadoopConf = ctx.hadoopConfiguration();

 hadoopConf.set(fs.s3n.awsAccessKeyId,
this.getAwsAccessKeyId());

 hadoopConf.set(fs.s3n.awsSecretAccessKey,
this.getAwsSecretAccessKey());

 lines = ctx.newAPIHadoopFile(inputPath,
NonSplitableTextInputFormat.class, LongWritable.class, Text.class,
hadoopConf).values()

  .map(new FunctionText, String() {

 @Override

 public String call(Text arg0)
throws Exception {

return arg0.toString();

 }

  });

And I have below error:

 

Exception in thread main
org.apache.hadoop.security.AccessControlException: Permission denied:
s3n://

at
org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.processException(J
ets3tNativeFileSystemStore.java:449)

at
org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.processException(J
ets3tNativeFileSystemStore.java:427)

 

The permission should not have any problem (because I can use ctx.textFile
without any issue). So the issue from the call: newAPIHadoopFile

 

Anything else I need to setup for this?

 

Regards,

 

Shuai



Re: SparkSQL concerning materials

2015-08-20 Thread Ted Yu
See also
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.package

Cheers

On Thu, Aug 20, 2015 at 7:50 AM, Muhammad Atif muhammadatif...@gmail.com
wrote:

 Hi Dawid

 The best pace to get started is the Spark SQL Guide from Apache
 http://spark.apache.org/docs/latest/sql-programming-guide.html

 Regards
 Muhammad

 On Thu, Aug 20, 2015 at 5:46 AM, Dawid Wysakowicz 
 wysakowicz.da...@gmail.com wrote:

 Hi,

 I would like to dip into SparkSQL. Get to know better the architecture,
 good practices, some internals. Could you advise me some materials on this
 matter?

 Regards
 Dawid





Re: How to avoid executor time out on yarn spark while dealing with large shuffle skewed data?

2015-08-20 Thread Umesh Kacha
Hi where do I see GC time in UI? I have set spark.yarn.executor.memoryOverhead
as 3500 which seems to be good enough I believe. So you mean only GC could
be the reason behind timeout I checked Yarn logs I did not see any GC error
there. Please guide. Thanks much.

On Thu, Aug 20, 2015 at 8:14 PM, Sandy Ryza sandy.r...@cloudera.com wrote:

 Moving this back onto user@

 Regarding GC, can you look in the web UI and see whether the GC time
 metric dominates the amount of time spent on each task (or at least the
 tasks that aren't completing)?

 Also, have you tried bumping your spark.yarn.executor.memoryOverhead?
 YARN may be killing your executors for using too much off-heap space.  You
 can see whether this is happening by looking in the Spark AM or YARN
 NodeManager logs.

 -Sandy

 On Thu, Aug 20, 2015 at 7:39 AM, Umesh Kacha umesh.ka...@gmail.com
 wrote:

 Hi thanks much for the response. Yes I tried default settings too 0.2 it
 was also going into timeout if it is spending time in GC then why it is not
 throwing GC error I don't see any such error. Yarn logs are not helpful at
 all. What is tungsten how do I use it? Spark is doing great I believe my
 job runs successfully and 60% tasks completes only after first executor
 gets lost things are messing.
 On Aug 20, 2015 7:59 PM, Sandy Ryza sandy.r...@cloudera.com wrote:

 What sounds most likely is that you're hitting heavy garbage
 collection.  Did you hit issues when the shuffle memory fraction was at its
 default of 0.2?  A potential danger with setting the shuffle storage to 0.7
 is that it allows shuffle objects to get into the GC old generation, which
 triggers more stop-the-world garbage collections.

 Have you tried enabling Tungsten / unsafe?

 Unfortunately, Spark is still not that great at dealing with
 heavily-skewed shuffle data, because its reduce-side aggregation still
 operates on Java objects instead of binary data.

 -Sandy

 On Thu, Aug 20, 2015 at 7:21 AM, Umesh Kacha umesh.ka...@gmail.com
 wrote:

 Hi Sandy thanks much for the response. I am using Spark 1.4.1 and I
 have set spark.shuffle.storage as 0.7 as my spark job involves 4 groupby
 queries executed using hiveContext.sql my data set is skewed so will be
 more shuffling I believe I don't know what's wrong spark job runs fine for
 almost an hour and when shuffle read shuffle write column in UI starts to
 show more than 10 gb executor starts to getting lost because of timeout and
 slowly other executor starts getting lost. Please guide.
 On Aug 20, 2015 7:38 PM, Sandy Ryza sandy.r...@cloudera.com wrote:

 What version of Spark are you using?  Have you set any shuffle configs?

 On Wed, Aug 19, 2015 at 11:46 AM, unk1102 umesh.ka...@gmail.com
 wrote:

 I have one Spark job which seems to run fine but after one hour or so
 executor start getting lost because of time out something like the
 following
 error

 cluster.yarnScheduler : Removing an executor 14 65 timeout exceeds
 60 seconds

 and because of above error couple of chained errors starts to come
 like
 FetchFailedException, Rpc client disassociated, Connection reset by
 peer,
 IOException etc

 Please see the following UI page I have noticed when shuffle
 read/write
 starts to increase more than 10 GB executors starts getting lost
 because of
 timeout. How do I clear this stacked memory of 10 GB in shuffle
 read/write
 section I dont cache anything why Spark is not clearing those memory.
 Please
 guide.

 IMG_20150819_231418358.jpg
 
 http://apache-spark-user-list.1001560.n3.nabble.com/file/n24345/IMG_20150819_231418358.jpg
 




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/How-to-avoid-executor-time-out-on-yarn-spark-while-dealing-with-large-shuffle-skewed-data-tp24345.html
 Sent from the Apache Spark User List mailing list archive at
 Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org







dataframe json schema scan

2015-08-20 Thread Alex Nastetsky
The doc for DataFrameReader#json(RDD[String]) method says

Unless the schema is specified using schema function, this function goes
through the input once to determine the input schema.

https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrameReader

Why is this necessary? Why can't it create the dataframe at the same time
as it's determining the schema?

Thanks.


Re: Data frame created from hive table and its partition

2015-08-20 Thread VIJAYAKUMAR JAWAHARLAL
Thanks Michael. My bad regarding hive table primary keys.

I have one big 140GB hdfs file and external hive table defined on it. Table is 
not partitioned. When I read external hive table using sqlContext.sql, how does 
spark decides number of partitions which should be created for that data frame?

SparkUI tells me that 1000+ tasks are created to read the above mentioned 
table. I guess one task per hdfs block. Does that mean it creates 1000+ 
partition created for DF? Is there a way to (hash)partition data frame on 
specific key column[s] when I read/load the hive table in spark?

Thanks,
Vijay


 On Aug 20, 2015, at 3:05 PM, Michael Armbrust mich...@databricks.com wrote:
 
 There is no such thing as primary keys in the Hive metastore, but Spark SQL 
 does support partitioned hive tables: 
 https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL#LanguageManualDDL-PartitionedTables
  
 https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL#LanguageManualDDL-PartitionedTables
 
 DataFrameWriter also has a partitionBy method.
 
 On Thu, Aug 20, 2015 at 7:29 AM, VIJAYAKUMAR JAWAHARLAL sparkh...@data2o.io 
 mailto:sparkh...@data2o.io wrote:
 Hi
 
 I have a question regarding data frame partition. I read a hive table from 
 spark and following spark api converts it as DF.
 
 test_df = sqlContext.sql(“select * from hivetable1”)
 
 How does spark decide partition of test_df? Is there a way to partition 
 test_df based on some column while reading hive table? Second question is, if 
 that hive table has primary key declared, does spark honor PK in hive table 
 and partition based on PKs?
 
 Thanks
 Vijay
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
 mailto:user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org 
 mailto:user-h...@spark.apache.org
 
 



load NULL Values in RDD

2015-08-20 Thread SAHA, DEBOBROTA
Hi ,

Can anyone help me in loading a column that may or may not have NULL values in 
a RDD.


Thanks


Creating Spark DataFrame from large pandas DataFrame

2015-08-20 Thread Charlie Hack
Hi,

I'm new to spark and am trying to create a Spark df from a pandas df with
~5 million rows. Using Spark 1.4.1.

When I type:

df = sqlContext.createDataFrame(pandas_df.where(pd.notnull(didf), None))

(the df.where is a hack I found on the Spark JIRA to avoid a problem with
NaN values making mixed column types)

I get:

TypeError: cannot create an RDD from type: type 'list'

Converting a smaller pandas dataframe (~2000 rows) works fine. Anyone had
this issue?


This is already a workaround-- ideally I'd like to read the spark dataframe
from a Hive table. But this is currently not an option for my setup.

I also tried reading the data into spark from a CSV using spark-csv.
Haven't been able to make this work as yet. I launch

$ pyspark --jars path/to/spark-csv_2.11-1.2.0.jar

and when I attempt to read the csv I get:

Py4JJavaError: An error occurred while calling o22.load. :
java.lang.NoClassDefFoundError: org/apache/commons/csv/CSVFormat ...

Other options I can think of:

- Convert my CSV to json (use Pig?) and read into Spark
- Read in using jdbc connect from postgres

But want to make sure I'm not misusing Spark or missing something obvious.

Thanks!

Charlie


Re: DataFrameWriter.jdbc is very slow

2015-08-20 Thread Michael Armbrust
We will probably fix this in Spark 1.6

https://issues.apache.org/jira/browse/SPARK-10040

On Thu, Aug 20, 2015 at 5:18 AM, Aram Mkrtchyan aram.mkrtchyan...@gmail.com
 wrote:

 We want to migrate our data (approximately 20M rows) from parquet to postgres,
 when we are using dataframe writer's jdbc method the execution time is very
 large,  we have tried the same with batch insert it was much effective.
 Is it intentionally implemented in that way?



Re: Data frame created from hive table and its partition

2015-08-20 Thread Michael Armbrust
There is no such thing as primary keys in the Hive metastore, but Spark SQL
does support partitioned hive tables:
https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL#LanguageManualDDL-PartitionedTables

DataFrameWriter also has a partitionBy method.

On Thu, Aug 20, 2015 at 7:29 AM, VIJAYAKUMAR JAWAHARLAL sparkh...@data2o.io
 wrote:

 Hi

 I have a question regarding data frame partition. I read a hive table from
 spark and following spark api converts it as DF.

 test_df = sqlContext.sql(“select * from hivetable1”)

 How does spark decide partition of test_df? Is there a way to partition
 test_df based on some column while reading hive table? Second question is,
 if that hive table has primary key declared, does spark honor PK in hive
 table and partition based on PKs?

 Thanks
 Vijay
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




FAILED_TO_UNCOMPRESS error from Snappy

2015-08-20 Thread Kohki Nishio
Right after upgraded to 1.4.1, we started seeing this exception and yes we
picked up snappy-java-1.1.1.7 (previously snappy-java-1.1.1.6). Is there
anything I could try ? I don't have a repro case.

org.apache.spark.shuffle.FetchFailedException: FAILED_TO_UNCOMPRESS(5)
at
org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67)
at
org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:84)
at
org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:84)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
at
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at
org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:127)
at
org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:169)
at
org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:168)
at
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:168)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: FAILED_TO_UNCOMPRESS(5)
at org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:84)
at org.xerial.snappy.SnappyNative.rawUncompress(Native Method)
at org.xerial.snappy.Snappy.rawUncompress(Snappy.java:444)
at org.xerial.snappy.Snappy.uncompress(Snappy.java:480)
at
org.xerial.snappy.SnappyInputStream.readFully(SnappyInputStream.java:135)
at
org.xerial.snappy.SnappyInputStream.readHeader(SnappyInputStream.java:92)
at org.xerial.snappy.SnappyInputStream.init(SnappyInputStream.java:58)
at
org.apache.spark.io.SnappyCompressionCodec.compressedInputStream(CompressionCodec.scala:160)
at
org.apache.spark.storage.BlockManager.wrapForCompression(BlockManager.scala:1178)
at
org.apache.spark.storage.ShuffleBlockFetcherIterator$$anonfun$4.apply(ShuffleBlockFetcherIterator.scala:301)
at
org.apache.spark.storage.ShuffleBlockFetcherIterator$$anonfun$4.apply(ShuffleBlockFetcherIterator.scala:300)
at scala.util.Success$$anonfun$map$1.apply(Try.scala:206)
at scala.util.Try$.apply(Try.scala:161)
at scala.util.Success.map(Try.scala:206)
at
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:300)
at
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:51)
... 33 more


-- 
Kohki Nishio


Re: Saving and loading MLlib models as standalone (no Hadoop)

2015-08-20 Thread Robineast
You can't serialize models out of Spark and then use them outside of the
Spark context. However there is support for the PMML format - have a look at
https://spark.apache.org/docs/latest/mllib-pmml-model-export.html

Robin
---
Robin East
Spark GraphX in Action Michael Malak and Robin East
Manning Publications Co.
http://www.manning.com/malak/



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Saving-and-loading-MLlib-models-as-standalone-no-Hadoop-tp24216p24371.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: FAILED_TO_UNCOMPRESS error from Snappy

2015-08-20 Thread Ruslan Dautkhanov
https://issues.apache.org/jira/browse/SPARK-7660 ?


-- 
Ruslan Dautkhanov

On Thu, Aug 20, 2015 at 1:49 PM, Kohki Nishio tarop...@gmail.com wrote:

 Right after upgraded to 1.4.1, we started seeing this exception and yes we
 picked up snappy-java-1.1.1.7 (previously snappy-java-1.1.1.6). Is there
 anything I could try ? I don't have a repro case.

 org.apache.spark.shuffle.FetchFailedException: FAILED_TO_UNCOMPRESS(5)
 at
 org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67)
 at
 org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:84)
 at
 org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:84)
 at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
 at
 org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
 at
 org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
 at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
 at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
 at
 org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:127)
 at
 org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:169)
 at
 org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:168)
 at
 scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
 at
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
 at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
 at
 scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
 at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:168)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
 at
 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
 at
 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
 at
 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
 at
 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
 at org.apache.spark.scheduler.Task.run(Task.scala:70)
 at
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
 at java.lang.Thread.run(Thread.java:745)
 Caused by: java.io.IOException: FAILED_TO_UNCOMPRESS(5)
 at org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:84)
 at org.xerial.snappy.SnappyNative.rawUncompress(Native Method)
 at org.xerial.snappy.Snappy.rawUncompress(Snappy.java:444)
 at org.xerial.snappy.Snappy.uncompress(Snappy.java:480)
 at
 org.xerial.snappy.SnappyInputStream.readFully(SnappyInputStream.java:135)
 at
 org.xerial.snappy.SnappyInputStream.readHeader(SnappyInputStream.java:92)
 at
 org.xerial.snappy.SnappyInputStream.init(SnappyInputStream.java:58)
 at
 org.apache.spark.io.SnappyCompressionCodec.compressedInputStream(CompressionCodec.scala:160)
 at
 org.apache.spark.storage.BlockManager.wrapForCompression(BlockManager.scala:1178)
 at
 org.apache.spark.storage.ShuffleBlockFetcherIterator$$anonfun$4.apply(ShuffleBlockFetcherIterator.scala:301)
 at
 org.apache.spark.storage.ShuffleBlockFetcherIterator$$anonfun$4.apply(ShuffleBlockFetcherIterator.scala:300)
 at scala.util.Success$$anonfun$map$1.apply(Try.scala:206)
 at scala.util.Try$.apply(Try.scala:161)
 at scala.util.Success.map(Try.scala:206)
 at
 org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:300)
 at
 org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:51)
 ... 33 more


 --
 Kohki Nishio



Re: SparkSQL concerning materials

2015-08-20 Thread Dhaval Patel
Or if you're a python lover then this is a good place -
https://spark.apache.org/docs/1.4.1/api/python/pyspark.sql.html#



On Thu, Aug 20, 2015 at 10:58 AM, Ted Yu yuzhih...@gmail.com wrote:

 See also
 http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.package

 Cheers

 On Thu, Aug 20, 2015 at 7:50 AM, Muhammad Atif muhammadatif...@gmail.com
 wrote:

 Hi Dawid

 The best pace to get started is the Spark SQL Guide from Apache
 http://spark.apache.org/docs/latest/sql-programming-guide.html

 Regards
 Muhammad

 On Thu, Aug 20, 2015 at 5:46 AM, Dawid Wysakowicz 
 wysakowicz.da...@gmail.com wrote:

 Hi,

 I would like to dip into SparkSQL. Get to know better the architecture,
 good practices, some internals. Could you advise me some materials on this
 matter?

 Regards
 Dawid






Re: How to get the radius of clusters in spark K means

2015-08-20 Thread Ashen Weerathunga
Okay. Thanks.
I already did that and wanted to check whether is there any other method to
extract it from the model itself. Thanks again for the help.

On Thu, Aug 20, 2015 at 8:39 PM, Robin East robin.e...@xense.co.uk wrote:

 There is no cluster radius method on the model returned from K-means.
 You’ll need to roll it yourself by generating the distance from each point
 in the cluster to the cluster center itself and then take the max.


 ---
 Robin East
 *Spark GraphX in Action* Michael Malak and Robin East
 Manning Publications Co.
 http://www.manning.com/malak/

 On 20 Aug 2015, at 07:14, ashensw as...@wso2.com wrote:

 We can get cluster centers in K means clustering. Like wise is there any
 method in spark to get the cluster radius?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/How-to-get-the-radius-of-clusters-in-spark-K-means-tp24353.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





-- 
*Ashen Weerathunga*
Software Engineer - Intern
WSO2 Inc.: http://wso2.com
lean.enterprise.middleware

Email: as...@wso2.com
Mobile: +94 716042995 94716042995
LinkedIn:
*http://lk.linkedin.com/in/ashenweerathunga
http://lk.linkedin.com/in/ashenweerathunga*


How to list all dataframes and RDDs available in current session?

2015-08-20 Thread Dhaval Patel
Hi:

I have been working on few example using zeppelin.

I have been trying to find a command that would list all *dataframes/RDDs*
that has been created in current session. Anyone knows if there is any such
commands available?

Something similar to SparkSQL to list all temp tables :
  show tables;

Thanks,
Dhaval


MLlib Prefixspan implementation

2015-08-20 Thread alexis GILLAIN
I want to use prefixspan so I had a look at the code and the cited paper :
Distributed PrefixSpan Algorithm Based on MapReduce.

There is a result in the paper I didn't really undertstand and I could'nt
find where it is used in the code.

Suppose a sequence database S = {­1­,2...­n}, a sequence a... is a
length-(L-1) (2≤L≤n) sequential pattern, in projected databases which is a
prefix of a length-(L-1) sequential pattern a...a, when the support count
of a is not less than min_support, it is equal to obtaining a length-L
sequential pattern  a ... a  from projected databases that obtaining a
length-L sequential pattern  a ... a  from a sequence database S.

According to the paper It's supposed to add a pruning step in the reduce
function but I couldn't find where.

This result seems to come from a previous paper : Wang Linlin, Fan Jun.
Improved Algorithm for Sequential Pattern Mining Based on PrefixSpan [J].
Computer Engineering, 2009, 35(23): 56-61 but it didn't help me to
understand it and how it can improve the algorithm.


SparkR - can't create spark context - JVM not ready

2015-08-20 Thread Deborah Siegel
Hello,

I have previously successfully run SparkR in RStudio, with:

Sys.setenv(SPARK_HOME=~/software/spark-1.4.1-bin-hadoop2.4)
.libPaths(c(file.path(Sys.getenv(SPARK_HOME), R, lib), .libPaths()))
library(SparkR)
sc - sparkR.init(master=local[2],appName=SparkR-example)


Then I tried putting some of it into an .Rprofile. It seemed to work to
load the paths and SparkR, but I got an error when trying to create the sc.
I then removed my .Rprofile, as well as .rstudio-desktop. However, I still
cannot create the sc. Here is the error

 sc - sparkR.init(master=local[2],appName=SparkR-example)
Launching java with spark-submit command
~/software/spark-1.4.1-bin-hadoop2.4/bin/spark-submit   sparkr-shell
/var/folders/p7/k1bpgmx93yd6pjq7dzf35gk8gn/T//RtmpOitA28/backend_port23377046db
sh: ~/software/spark-1.4.1-bin-hadoop2.4/bin/spark-submit: No such file or
directory
Error in sparkR.init(master = local[2], appName = SparkR-example) :
JVM is not ready after 10 seconds
I suspected there was an incomplete process or something. I checked for any
running R or Java processes and there were none. Has someone seen this type
of error? I have the same error in both RStudio and in R shell (but not
sparkR wrapper).

Thanks,
Deb


Spark SQL window functions (RowsBetween)

2015-08-20 Thread Mike Trienis
Hi All,

I would like some clarification regarding window functions for Apache Spark
1.4.0

   -
   
https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html

In particular, the rowsBetween

   * {{{
   *   val w = Window.partitionBy(name).orderBy(id)
   *   df.select(
   * sum(price).over(w.rangeBetween(Long.MinValue, 2)),
   * avg(price).over(w.rowsBetween(0, 4))
   *   )
   * }}}


Are any of the window functions available without a hive context? If the
answer is no, then is there any other way to accomplish this without using
hive?

I need to compare the the i[th] row with the [i-1]th row of col2 (sorted by
col1). If item_i of the i[th] row and the item_[i-1] of the [i-1]th row are
different then I need to increment the count of item_[i-1] by 1.


col1| col2
--
1| item_1
2| item_1
3| item_2
4| item_1
5| item_2
6| item_1

In the above example, if we scan two rows at a time downwards,  we see that
row 2 and row 3 are different therefore we add one to item_1. Next, we see
that row 3 is different from row 4, then add one to item_2. Continue until
we end up with:

 col2  | col3
---
 item_1  | 2
 item_2  | 2

Thanks, Mike.