Re: spark kafka partitioning

2015-08-20 Thread Gaurav Agarwal
when i send the message from kafka topic having three partitions.

Spark will listen the message when i say kafkautils.createStream or
createDirectstSream have local[4]
Now i want to see if spark will create partitions when it receive
message from kafka using dstream, how and where ,prwhich method of
spark api i have to see to find out

On 8/21/15, Gaurav Agarwal  wrote:
> Hello
>
> Regarding Spark Streaming and Kafka Partitioning
>
> When i send message on kafka topic with 3 partitions and listens on
> kafkareceiver with local value[4] . how will i come to know in Spark
> Streaming that different Dstreams are created according to partitions of
> kafka messages .
>
> Thanks
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: SparkSQL concerning materials

2015-08-20 Thread Dawid Wysakowicz
Hi,

thanks for answers. I have read answers you provided, but I rather look for
some materials on the internals. E.g how the optimizer works, how the query
is translated into rdd operations etc. The API I am quite familiar with.
A good starting point for me was: Spark DataFrames: Simple and Fast
Analysis of Structured Data


2015-08-20 18:29 GMT+02:00 Dhaval Patel :

> Or if you're a python lover then this is a good place -
> https://spark.apache.org/docs/1.4.1/api/python/pyspark.sql.html#
>
>
>
> On Thu, Aug 20, 2015 at 10:58 AM, Ted Yu  wrote:
>
>> See also
>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.package
>>
>> Cheers
>>
>> On Thu, Aug 20, 2015 at 7:50 AM, Muhammad Atif > > wrote:
>>
>>> Hi Dawid
>>>
>>> The best pace to get started is the Spark SQL Guide from Apache
>>> http://spark.apache.org/docs/latest/sql-programming-guide.html
>>>
>>> Regards
>>> Muhammad
>>>
>>> On Thu, Aug 20, 2015 at 5:46 AM, Dawid Wysakowicz <
>>> wysakowicz.da...@gmail.com> wrote:
>>>
 Hi,

 I would like to dip into SparkSQL. Get to know better the architecture,
 good practices, some internals. Could you advise me some materials on this
 matter?

 Regards
 Dawid

>>>
>>>
>>
>


Spark-Cassandra-connector

2015-08-20 Thread Samya
Hi All,

I need to write an RDD to Cassandra & using the sparkCassandraConnector from
DataStax. My application is using Yarn.

*Some basic Questions :*
1.  Will a call to saveToCassandra(.), be using the same connection
object between all task in a given executor? I mean is there 1 (one)
connection object per executor, that is shared between tasks ?
2. If the above answer is YES, is there a way to create a connectionPool for
each executor, so that multiple task can dump data to cassandra in parallel?

Regards,
Samya



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Cassandra-connector-tp24378.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: SparkR csv without headers

2015-08-20 Thread Franc Carter
Thanks - works nicely

cheers

On Fri, Aug 21, 2015 at 12:43 PM, Sun, Rui  wrote:

> Hi,
>
>
>
> You can create a DataFrame using load.df() with a specified schema.
>
>
>
> Something like:
>
> schema <- structType(structField(“a”, “string”), structField(“b”,
> integer), …)
>
> read.df ( …, schema = schema)
>
>
>
> *From:* Franc Carter [mailto:franc.car...@rozettatech.com]
> *Sent:* Wednesday, August 19, 2015 1:48 PM
> *To:* user@spark.apache.org
> *Subject:* SparkR csv without headers
>
>
>
>
>
> Hi,
>
>
>
> Does anyone have an example of how to create a DataFrame in SparkR  which
> specifies the column names - the csv files I have do not have column names
> in the first row. I can get read a csv nicely with
> com.databricks:spark-csv_2.10:1.0.3, but I end up with column names C1, C2,
> C3 etc
>
>
>
>
>
> thanks
>
>
>
> --
>
> *Franc Carter* I  Systems ArchitectI RoZetta Technology
>
>
>
> [image: Description: Description: Description:
> cid:image003.jpg@01D02903.9B540580]
>
>
>
> L4. 55 Harrington Street,  THE ROCKS,  NSW, 2000
>
> PO Box H58, Australia Square, Sydney NSW, 1215, AUSTRALIA
>
> *T*  +61 2 8355 2515 Iwww.rozettatechnology.com
>
> [image: cid:image002.jpg@01D02903.0B41B280]
>
> DISCLAIMER: The contents of this email, inclusive of attachments, may be
> legally
>
> privileged and confidential. Any unauthorised use of the contents is
> expressly prohibited.
>
>
>
>
>



-- 

*Franc Carter* I  Systems ArchitectI RoZetta Technology



[image: Description: Description: Description:
cid:image003.jpg@01D02903.9B540580]



L4. 55 Harrington Street,  THE ROCKS,  NSW, 2000

PO Box H58, Australia Square, Sydney NSW, 1215, AUSTRALIA

*T*  +61 2 8355 2515 Iwww.rozettatechnology.com

[image: cid:image002.jpg@01D02903.0B41B280]

DISCLAIMER: The contents of this email, inclusive of attachments, may be
legally

privileged and confidential. Any unauthorised use of the contents is
expressly prohibited.


Re: SparkR - can't create spark context - JVM not ready

2015-08-20 Thread Deborah Siegel
Thanks Shivaram. You got me wondering about the path so I put it in full
and it worked. R does not, of course, expand a "~".

On Thu, Aug 20, 2015 at 4:35 PM, Shivaram Venkataraman <
shiva...@eecs.berkeley.edu> wrote:

> Can you check if the file
> `~/software/spark-1.4.1-bin-hadoop2.4/bin/spark-submit` exists ? The
> error message seems to indicate it is trying to pick up Spark from
> that location and can't seem to find Spark installed there.
>
> Thanks
> Shivaram
>
> On Thu, Aug 20, 2015 at 3:30 PM, Deborah Siegel
>  wrote:
> > Hello,
> >
> > I have previously successfully run SparkR in RStudio, with:
> >
> >>Sys.setenv(SPARK_HOME="~/software/spark-1.4.1-bin-hadoop2.4")
> >>.libPaths(c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib"),
> .libPaths()))
> >>library(SparkR)
> >>sc <- sparkR.init(master="local[2]",appName="SparkR-example")
> >
> >
> > Then I tried putting some of it into an .Rprofile. It seemed to work to
> load
> > the paths and SparkR, but I got an error when trying to create the sc. I
> > then removed my .Rprofile, as well as .rstudio-desktop. However, I still
> > cannot create the sc. Here is the error
> >
> >> sc <- sparkR.init(master="local[2]",appName="SparkR-example")
> > Launching java with spark-submit command
> > ~/software/spark-1.4.1-bin-hadoop2.4/bin/spark-submit   sparkr-shell
> >
> /var/folders/p7/k1bpgmx93yd6pjq7dzf35gk8gn/T//RtmpOitA28/backend_port23377046db
> > sh: ~/software/spark-1.4.1-bin-hadoop2.4/bin/spark-submit: No such file
> or
> > directory
> > Error in sparkR.init(master = "local[2]", appName = "SparkR-example") :
> > JVM is not ready after 10 seconds
> >
> > I suspected there was an incomplete process or something. I checked for
> any
> > running R or Java processes and there were none. Has someone seen this
> type
> > of error? I have the same error in both RStudio and in R shell (but not
> > sparkR wrapper).
> >
> > Thanks,
> > Deb
> >
> >
>


Re: Convert mllib.linalg.Matrix to Breeze

2015-08-20 Thread Naveen

Hi,

Thanks for the reply. I tried Matrix.toBreeze() which returns the 
following error:


*/method toBreeze in trait Matrix cannot be accessed in 
org.apache.spark.mllib.linalg.Matrix/*



On Thursday 20 August 2015 07:50 PM, Burak Yavuz wrote:
Matrix.toBreeze is a private method. MLlib matrices have the same 
structure as Breeze Matrices. Just create a new Breeze matrix like 
this 
. 



Best,
Burak


On Thu, Aug 20, 2015 at 3:28 AM, Yanbo Liang > wrote:


You can use Matrix.toBreeze()


 .

2015-08-20 18:24 GMT+08:00 Naveen mailto:nav...@formcept.com>>:

Hi All,

Is there anyway to convert a mllib matrix to a Dense Matrix of
Breeze? Any leads are appreciated.


Thanks,
Naveen

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org

For additional commands, e-mail: user-h...@spark.apache.org








Re: spark kafka partitioning

2015-08-20 Thread ayan guha
If you have 1 topic, that means you have 1 DStream, which will have a
series of RDDs for each batch interval. In receiver-based integration,
there is no direct relationship b/w Kafka paritions with spark partitions.
in Direct approach, 1 partition will be created for each kafka partition.

On Fri, Aug 21, 2015 at 12:48 PM, Gaurav Agarwal 
wrote:

> Hello
>
> Regarding Spark Streaming and Kafka Partitioning
>
> When i send message on kafka topic with 3 partitions and listens on
> kafkareceiver with local value[4] . how will i come to know in Spark
> Streaming that different Dstreams are created according to partitions of
> kafka messages .
>
> Thanks
>
>


-- 
Best Regards,
Ayan Guha


Re: spark kafka partitioning

2015-08-20 Thread Cody Koeninger
I'm not clear on your question, can you rephrase it?  Also, are you talking
about createStream or createDirectStream?

On Thu, Aug 20, 2015 at 9:48 PM, Gaurav Agarwal 
wrote:

> Hello
>
> Regarding Spark Streaming and Kafka Partitioning
>
> When i send message on kafka topic with 3 partitions and listens on
> kafkareceiver with local value[4] . how will i come to know in Spark
> Streaming that different Dstreams are created according to partitions of
> kafka messages .
>
> Thanks
>
>


RE: Any suggestion about "sendMessageReliably failed because ack was not received within 120 sec"

2015-08-20 Thread java8964
The closed information I can found online related to this error 
ishttps://issues.apache.org/jira/browse/SPARK-3633
But it is quite different in our case. In our case, we never saw the "(Too many 
open files)" error, the log just simple show the 120 sec time out.
I checked all the GC output from all 42 executors, the max full gc real=11.79 
secs is what I can find, way less than 120 seconds time out.
>From 42 executors, there is on executor's stdout/stderr page hangs, I cannot 
>see any gc or log information for this executor, but it is shown as "LOADING" 
>in the master page, and I think the reason is just the "WorkerUI" cannot bind 
>to 8081 somehow during the boot time, and bind to 8082 instead, master UI 
>didn't catch that information.
Anyway, my only option now is to increase the timeout of both 
"spark.core.connection.ack.wait.timeout" and "spark.akka.timeout" to 600, as 
suggested in the jira, and will report back what I find later.
This same daily job runs about 12 hours in the Hive/MR, and can finish about 4 
hours in Spark (with 25% allocated cluster resource). On this point, Spark is 
faster and great, but IF (big IF) every tasks run smoothly.
In Hive/MR, if the job is setup, it will finish, maybe slow, but smoothly. In 
Spark, in this case, it does retry the failed partitions only, but we saw 4 or 
5 times retry sometimes, make it in fact much much slower.
Yong
From: java8...@hotmail.com
To: user@spark.apache.org
Subject: Any suggestion about "sendMessageReliably failed because ack was not 
received within 120 sec"
Date: Thu, 20 Aug 2015 20:49:52 -0400




Hi, Sparkers:
After first 2 weeks of Spark in our production cluster, with more familiar with 
Spark, we are more confident to avoid "Lost Executor" due to memory issue. So 
far, most of our jobs won't fail or slow down due to "Lost executor".
But sometimes, I observed that individual tasks failed due to 
"sendMessageReliably failed because ack was not received within 120 sec". 
Here is the basic information:
Spark 1.3.1 in 1 master + 42 worker boxes in standalone deploymentThe cluster 
also runs Hadoop + MapReduce, so we allocate about 25% resource to Spark. We 
are conservative for the Spark jobs, with low number of cores  + big 
parallelism/partitions to control the memory usage in the job, so far we are 
happen to avoid "lost executor".
We have one big daily job is running with following configuration:
/opt/spark/bin/spark-shell --jars spark-avro.jar --conf spark.ui.port=4042 
--executor-memory 20G --total-executor-cores 168 --conf 
spark.storage.memoryFraction=0.1 --conf spark.sql.shuffle.partitions=6000 
--conf spark.default.parallelism=6000 --conf 
spark.shuffle.blockTransferService=nio -i spark.script
168 cores will make each executor run with 4 thread (168 / 42 = 4)There is no 
cache needed, so I make the storage memoryFraction very lownio is much robust 
than netty in our experience
For this big daily job generating over 2 of tasks, they all could finish 
without this issue, but sometimes, for the same job, tasks keep failing due to 
this error and retry.
But even in this case, I saw the task failed due to this error and retry. Retry 
maybe part of life for distribute environment, but I want to know what root 
cause could behind it and how to avoid it.
Do I increase "spark.core.connection.ack.wait.timeout" to fix this error? When 
this happened, I saw there is no executor lost, all are alive. 
Below is the message in the log, for example, it complained about timeout to 
connect to host-121.
FetchFailed(BlockManagerId(31, host-121, 38930), shuffleId=3, mapId=17, 
reduceId=2577, message=org.apache.spark.shuffle.FetchFailedException: 
sendMessageReliably failed because ack was not received within 120 sec  at 
org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67)
  at 
org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)
  at 
org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)
  at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)  at 
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)  
at 
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)  
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)  at 
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)  at 
scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)  at 
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)  at 
org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1$$anonfun$7.apply(Aggregate.scala:154)
  at 
org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1$$anonfun$7.apply(Aggregate.scala:149)
  at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:634)  at 
org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:634)  at 
org.apache.s

spark kafka partitioning

2015-08-20 Thread Gaurav Agarwal
Hello

Regarding Spark Streaming and Kafka Partitioning

When i send message on kafka topic with 3 partitions and listens on
kafkareceiver with local value[4] . how will i come to know in Spark
Streaming that different Dstreams are created according to partitions of
kafka messages .

Thanks


RE: SparkR csv without headers

2015-08-20 Thread Sun, Rui
Hi,

You can create a DataFrame using load.df() with a specified schema.

Something like:
schema <- structType(structField(“a”, “string”), structField(“b”, integer), …)
read.df ( …, schema = schema)

From: Franc Carter [mailto:franc.car...@rozettatech.com]
Sent: Wednesday, August 19, 2015 1:48 PM
To: user@spark.apache.org
Subject: SparkR csv without headers


Hi,

Does anyone have an example of how to create a DataFrame in SparkR  which 
specifies the column names - the csv files I have do not have column names in 
the first row. I can get read a csv nicely with 
com.databricks:spark-csv_2.10:1.0.3, but I end up with column names C1, C2, C3 
etc


thanks

--

Franc Carter I  Systems ArchitectI RoZetta Technology



[Description: Description: Description: cid:image003.jpg@01D02903.9B540580]



L4. 55 Harrington Street,  THE ROCKS,  NSW, 2000

PO Box H58, Australia Square, Sydney NSW, 1215, AUSTRALIA

T  +61 2 8355 2515 I
www.rozettatechnology.com

[cid:image002.jpg@01D02903.0B41B280]

DISCLAIMER: The contents of this email, inclusive of attachments, may be legally

privileged and confidential. Any unauthorised use of the contents is expressly 
prohibited.




Re: what determine the task size?

2015-08-20 Thread ambujhbti
cwz wrote
> sorry, my question is not clear.
> I mean what determine the one task size? not how many tasks

one task size= one HDFS block size.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/what-determine-the-task-size-tp24363p24375.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Kafka Spark Partition Mapping

2015-08-20 Thread Cody Koeninger
In general you cannot guarantee which node an RDD will be processed on.

The preferred location for a kafkardd is the kafka leader for that
partition, if they're deployed on the same machine. If you want to try to
override that behavior, the method is getPreferredLocations

But even in that case, location preferences are just a scheduler hint, the
rdd can still be scheduled elsewhere.  You can turn up spark.locality.wait
to a very high value to decrease the likelihood.



On Thu, Aug 20, 2015 at 5:47 PM, nehalsyed 
wrote:

> I have data in Kafka topic-partition and I am reading it from Spark like
> this: JavaPairReceiverInputDStream directKafkaStream =
> KafkaUtils.createDirectStream(streamingContext, [key class], [value class],
> [key decoder class], [value decoder class], [map of Kafka parameters], [set
> of topics to consume]); I want that message from a kafka partition always
> land on same machine on Spark rdd so I can cache some decoration data
> locally and later reuse with other messages (that belong to same key). Can
> anyone tell me how can I achieve it? Thanks
> --
> View this message in context: Kafka Spark Partition Mapping
> 
> Sent from the Apache Spark User List mailing list archive
>  at Nabble.com.
>


Any suggestion about "sendMessageReliably failed because ack was not received within 120 sec"

2015-08-20 Thread java8964
Hi, Sparkers:
After first 2 weeks of Spark in our production cluster, with more familiar with 
Spark, we are more confident to avoid "Lost Executor" due to memory issue. So 
far, most of our jobs won't fail or slow down due to "Lost executor".
But sometimes, I observed that individual tasks failed due to 
"sendMessageReliably failed because ack was not received within 120 sec". 
Here is the basic information:
Spark 1.3.1 in 1 master + 42 worker boxes in standalone deploymentThe cluster 
also runs Hadoop + MapReduce, so we allocate about 25% resource to Spark. We 
are conservative for the Spark jobs, with low number of cores  + big 
parallelism/partitions to control the memory usage in the job, so far we are 
happen to avoid "lost executor".
We have one big daily job is running with following configuration:
/opt/spark/bin/spark-shell --jars spark-avro.jar --conf spark.ui.port=4042 
--executor-memory 20G --total-executor-cores 168 --conf 
spark.storage.memoryFraction=0.1 --conf spark.sql.shuffle.partitions=6000 
--conf spark.default.parallelism=6000 --conf 
spark.shuffle.blockTransferService=nio -i spark.script
168 cores will make each executor run with 4 thread (168 / 42 = 4)There is no 
cache needed, so I make the storage memoryFraction very lownio is much robust 
than netty in our experience
For this big daily job generating over 2 of tasks, they all could finish 
without this issue, but sometimes, for the same job, tasks keep failing due to 
this error and retry.
But even in this case, I saw the task failed due to this error and retry. Retry 
maybe part of life for distribute environment, but I want to know what root 
cause could behind it and how to avoid it.
Do I increase "spark.core.connection.ack.wait.timeout" to fix this error? When 
this happened, I saw there is no executor lost, all are alive. 
Below is the message in the log, for example, it complained about timeout to 
connect to host-121.
FetchFailed(BlockManagerId(31, host-121, 38930), shuffleId=3, mapId=17, 
reduceId=2577, message=org.apache.spark.shuffle.FetchFailedException: 
sendMessageReliably failed because ack was not received within 120 sec  at 
org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67)
  at 
org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)
  at 
org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)
  at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)  at 
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)  
at 
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)  
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)  at 
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)  at 
scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)  at 
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)  at 
org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1$$anonfun$7.apply(Aggregate.scala:154)
  at 
org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1$$anonfun$7.apply(Aggregate.scala:149)
  at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:634)  at 
org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:634)  at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)  at 
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)  at 
org.apache.spark.rdd.RDD.iterator(RDD.scala:244)  at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)  at 
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)  at 
org.apache.spark.rdd.RDD.iterator(RDD.scala:244)  at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)  at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)  at 
org.apache.spark.scheduler.Task.run(Task.scala:64)  at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)  at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
 at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
 at java.lang.Thread.run(Thread.java:745)Caused by: java.io.IOException: 
sendMessageReliably failed because ack was not received within 120 sec  at 
org.apache.spark.network.nio.ConnectionManager$$anon$13$$anonfun$run$19.apply(ConnectionManager.scala:929)
  at 
org.apache.spark.network.nio.ConnectionManager$$anon$13$$anonfun$run$19.apply(ConnectionManager.scala:928)
  at scala.Option.foreach(Option.scala:236)  at 
org.apache.spark.network.nio.ConnectionManager$$anon$13.run(ConnectionManager.scala:928)
  at 
io.netty.util.HashedWheelTimer$HashedWheelTimeout.expire(HashedWheelTimer.java:581)
  at 
io.netty.util.HashedWheelTimer$HashedWheelBucket.expireTimeouts(HashedWheelTimer.java:656)
  at io.netty.util.HashedWheelTimer$Worker.run(Hashed

Re: SparkR - can't create spark context - JVM not ready

2015-08-20 Thread Shivaram Venkataraman
Can you check if the file
`~/software/spark-1.4.1-bin-hadoop2.4/bin/spark-submit` exists ? The
error message seems to indicate it is trying to pick up Spark from
that location and can't seem to find Spark installed there.

Thanks
Shivaram

On Thu, Aug 20, 2015 at 3:30 PM, Deborah Siegel
 wrote:
> Hello,
>
> I have previously successfully run SparkR in RStudio, with:
>
>>Sys.setenv(SPARK_HOME="~/software/spark-1.4.1-bin-hadoop2.4")
>>.libPaths(c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib"), .libPaths()))
>>library(SparkR)
>>sc <- sparkR.init(master="local[2]",appName="SparkR-example")
>
>
> Then I tried putting some of it into an .Rprofile. It seemed to work to load
> the paths and SparkR, but I got an error when trying to create the sc. I
> then removed my .Rprofile, as well as .rstudio-desktop. However, I still
> cannot create the sc. Here is the error
>
>> sc <- sparkR.init(master="local[2]",appName="SparkR-example")
> Launching java with spark-submit command
> ~/software/spark-1.4.1-bin-hadoop2.4/bin/spark-submit   sparkr-shell
> /var/folders/p7/k1bpgmx93yd6pjq7dzf35gk8gn/T//RtmpOitA28/backend_port23377046db
> sh: ~/software/spark-1.4.1-bin-hadoop2.4/bin/spark-submit: No such file or
> directory
> Error in sparkR.init(master = "local[2]", appName = "SparkR-example") :
> JVM is not ready after 10 seconds
>
> I suspected there was an incomplete process or something. I checked for any
> running R or Java processes and there were none. Has someone seen this type
> of error? I have the same error in both RStudio and in R shell (but not
> sparkR wrapper).
>
> Thanks,
> Deb
>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Sql behaves strangely with tables with a lot of partitions

2015-08-20 Thread Philip Weaver
I hadn't heard of spark.sql.sources.partitionDiscovery.enabled before, and
I couldn't find much information about it online. What does it mean exactly
to disable it? Are there any negative consequences to disabling it?

On Wed, Aug 19, 2015 at 10:53 PM, Cheng, Hao  wrote:

> Can you make some more profiling? I am wondering if the driver is busy
> with scanning the HDFS / S3.
>
> Like jstack 
>
>
>
> And also, it’s will be great if you can paste the physical plan for the
> simple query.
>
>
>
> *From:* Jerrick Hoang [mailto:jerrickho...@gmail.com]
> *Sent:* Thursday, August 20, 2015 1:46 PM
> *To:* Cheng, Hao
> *Cc:* Philip Weaver; user
> *Subject:* Re: Spark Sql behaves strangely with tables with a lot of
> partitions
>
>
>
> I cloned from TOT after 1.5.0 cut off. I noticed there were a couple of
> CLs trying to speed up spark sql with tables with a huge number of
> partitions, I've made sure that those CLs are included but it's still very
> slow
>
>
>
> On Wed, Aug 19, 2015 at 10:43 PM, Cheng, Hao  wrote:
>
> Yes, you can try set the spark.sql.sources.partitionDiscovery.enabled to
> false.
>
>
>
> BTW, which version are you using?
>
>
>
> Hao
>
>
>
> *From:* Jerrick Hoang [mailto:jerrickho...@gmail.com]
> *Sent:* Thursday, August 20, 2015 12:16 PM
> *To:* Philip Weaver
> *Cc:* user
> *Subject:* Re: Spark Sql behaves strangely with tables with a lot of
> partitions
>
>
>
> I guess the question is why does spark have to do partition discovery with
> all partitions when the query only needs to look at one partition? Is there
> a conf flag to turn this off?
>
>
>
> On Wed, Aug 19, 2015 at 9:02 PM, Philip Weaver 
> wrote:
>
> I've had the same problem. It turns out that Spark (specifically parquet)
> is very slow at partition discovery. It got better in 1.5 (not yet
> released), but was still unacceptably slow. Sadly, we ended up reading
> parquet files manually in Python (via C++) and had to abandon Spark SQL
> because of this problem.
>
>
>
> On Wed, Aug 19, 2015 at 7:51 PM, Jerrick Hoang 
> wrote:
>
> Hi all,
>
>
>
> I did a simple experiment with Spark SQL. I created a partitioned parquet
> table with only one partition (date=20140701). A simple `select count(*)
> from table where date=20140701` would run very fast (0.1 seconds). However,
> as I added more partitions the query takes longer and longer. When I added
> about 10,000 partitions, the query took way too long. I feel like querying
> for a single partition should not be affected by having more partitions. Is
> this a known behaviour? What does spark try to do here?
>
>
>
> Thanks,
>
> Jerrick
>
>
>
>
>
>
>


Kafka Spark Partition Mapping

2015-08-20 Thread nehalsyed
I have data in Kafka topic-partition and I am reading it from Spark like
this: JavaPairReceiverInputDStream directKafkaStream = 
KafkaUtils.createDirectStream(streamingContext, [key class], [value
class], [key decoder class], [value decoder class], [map of Kafka
parameters], [set of topics to consume]);I want that message from a kafka
partition always land on same machine on Spark rdd so I can cache some
decoration data locally and later reuse with other messages (that belong to
same key). Can anyone tell me how can I achieve it?Thanks



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Kafka-Spark-Partition-Mapping-tp24372.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Spark SQL window functions (RowsBetween)

2015-08-20 Thread Mike Trienis
Hi All,

I would like some clarification regarding window functions for Apache Spark
1.4.0

   -
   
https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html

In particular, the "rowsBetween"

   * {{{
   *   val w = Window.partitionBy("name").orderBy("id")
   *   df.select(
   * sum("price").over(w.rangeBetween(Long.MinValue, 2)),
   * avg("price").over(w.rowsBetween(0, 4))
   *   )
   * }}}


Are any of the window functions available without a hive context? If the
answer is no, then is there any other way to accomplish this without using
hive?

I need to compare the the i[th] row with the [i-1]th row of col2 (sorted by
col1). If item_i of the i[th] row and the item_[i-1] of the [i-1]th row are
different then I need to increment the count of item_[i-1] by 1.


col1| col2
--
1| item_1
2| item_1
3| item_2
4| item_1
5| item_2
6| item_1

In the above example, if we scan two rows at a time downwards,  we see that
row 2 and row 3 are different therefore we add one to item_1. Next, we see
that row 3 is different from row 4, then add one to item_2. Continue until
we end up with:

 col2  | col3
---
 item_1  | 2
 item_2  | 2

Thanks, Mike.


SparkR - can't create spark context - JVM not ready

2015-08-20 Thread Deborah Siegel
Hello,

I have previously successfully run SparkR in RStudio, with:

>Sys.setenv(SPARK_HOME="~/software/spark-1.4.1-bin-hadoop2.4")
>.libPaths(c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib"), .libPaths()))
>library(SparkR)
>sc <- sparkR.init(master="local[2]",appName="SparkR-example")


Then I tried putting some of it into an .Rprofile. It seemed to work to
load the paths and SparkR, but I got an error when trying to create the sc.
I then removed my .Rprofile, as well as .rstudio-desktop. However, I still
cannot create the sc. Here is the error

> sc <- sparkR.init(master="local[2]",appName="SparkR-example")
Launching java with spark-submit command
~/software/spark-1.4.1-bin-hadoop2.4/bin/spark-submit   sparkr-shell
/var/folders/p7/k1bpgmx93yd6pjq7dzf35gk8gn/T//RtmpOitA28/backend_port23377046db
sh: ~/software/spark-1.4.1-bin-hadoop2.4/bin/spark-submit: No such file or
directory
Error in sparkR.init(master = "local[2]", appName = "SparkR-example") :
JVM is not ready after 10 seconds
I suspected there was an incomplete process or something. I checked for any
running R or Java processes and there were none. Has someone seen this type
of error? I have the same error in both RStudio and in R shell (but not
sparkR wrapper).

Thanks,
Deb


Re: FAILED_TO_UNCOMPRESS error from Snappy

2015-08-20 Thread Ruslan Dautkhanov
https://issues.apache.org/jira/browse/SPARK-7660 ?


-- 
Ruslan Dautkhanov

On Thu, Aug 20, 2015 at 1:49 PM, Kohki Nishio  wrote:

> Right after upgraded to 1.4.1, we started seeing this exception and yes we
> picked up snappy-java-1.1.1.7 (previously snappy-java-1.1.1.6). Is there
> anything I could try ? I don't have a repro case.
>
> org.apache.spark.shuffle.FetchFailedException: FAILED_TO_UNCOMPRESS(5)
> at
> org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67)
> at
> org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:84)
> at
> org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:84)
> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
> at
> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
> at
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> at
> org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:127)
> at
> org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:169)
> at
> org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:168)
> at
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> at
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
> at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:168)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
> at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
> at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
> at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
> at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
> at org.apache.spark.scheduler.Task.run(Task.scala:70)
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.IOException: FAILED_TO_UNCOMPRESS(5)
> at org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:84)
> at org.xerial.snappy.SnappyNative.rawUncompress(Native Method)
> at org.xerial.snappy.Snappy.rawUncompress(Snappy.java:444)
> at org.xerial.snappy.Snappy.uncompress(Snappy.java:480)
> at
> org.xerial.snappy.SnappyInputStream.readFully(SnappyInputStream.java:135)
> at
> org.xerial.snappy.SnappyInputStream.readHeader(SnappyInputStream.java:92)
> at
> org.xerial.snappy.SnappyInputStream.(SnappyInputStream.java:58)
> at
> org.apache.spark.io.SnappyCompressionCodec.compressedInputStream(CompressionCodec.scala:160)
> at
> org.apache.spark.storage.BlockManager.wrapForCompression(BlockManager.scala:1178)
> at
> org.apache.spark.storage.ShuffleBlockFetcherIterator$$anonfun$4.apply(ShuffleBlockFetcherIterator.scala:301)
> at
> org.apache.spark.storage.ShuffleBlockFetcherIterator$$anonfun$4.apply(ShuffleBlockFetcherIterator.scala:300)
> at scala.util.Success$$anonfun$map$1.apply(Try.scala:206)
> at scala.util.Try$.apply(Try.scala:161)
> at scala.util.Success.map(Try.scala:206)
> at
> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:300)
> at
> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:51)
> ... 33 more
>
>
> --
> Kohki Nishio
>


Re: Saving and loading MLlib models as standalone (no Hadoop)

2015-08-20 Thread Robineast
You can't serialize models out of Spark and then use them outside of the
Spark context. However there is support for the PMML format - have a look at
https://spark.apache.org/docs/latest/mllib-pmml-model-export.html

Robin
---
Robin East
Spark GraphX in Action Michael Malak and Robin East
Manning Publications Co.
http://www.manning.com/malak/



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Saving-and-loading-MLlib-models-as-standalone-no-Hadoop-tp24216p24371.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to avoid executor time out on yarn spark while dealing with large shuffle skewed data?

2015-08-20 Thread Sandy Ryza
GC wouldn't necessarily result in errors - it could just be slowing down
your job and causing the executor JVMs to stall.  If you click on a stage
in the UI, you should end up on a page with all the metrics concerning the
tasks that ran in that stage.  "GC Time" is one of these task metrics.

-Sandy

On Thu, Aug 20, 2015 at 8:54 AM, Umesh Kacha  wrote:

> Hi where do I see GC time in UI? I have set spark.yarn.executor.memoryOverhead
> as 3500 which seems to be good enough I believe. So you mean only GC could
> be the reason behind timeout I checked Yarn logs I did not see any GC error
> there. Please guide. Thanks much.
>
> On Thu, Aug 20, 2015 at 8:14 PM, Sandy Ryza 
> wrote:
>
>> Moving this back onto user@
>>
>> Regarding GC, can you look in the web UI and see whether the "GC time"
>> metric dominates the amount of time spent on each task (or at least the
>> tasks that aren't completing)?
>>
>> Also, have you tried bumping your spark.yarn.executor.memoryOverhead?
>> YARN may be killing your executors for using too much off-heap space.  You
>> can see whether this is happening by looking in the Spark AM or YARN
>> NodeManager logs.
>>
>> -Sandy
>>
>> On Thu, Aug 20, 2015 at 7:39 AM, Umesh Kacha 
>> wrote:
>>
>>> Hi thanks much for the response. Yes I tried default settings too 0.2 it
>>> was also going into timeout if it is spending time in GC then why it is not
>>> throwing GC error I don't see any such error. Yarn logs are not helpful at
>>> all. What is tungsten how do I use it? Spark is doing great I believe my
>>> job runs successfully and 60% tasks completes only after first executor
>>> gets lost things are messing.
>>> On Aug 20, 2015 7:59 PM, "Sandy Ryza"  wrote:
>>>
 What sounds most likely is that you're hitting heavy garbage
 collection.  Did you hit issues when the shuffle memory fraction was at its
 default of 0.2?  A potential danger with setting the shuffle storage to 0.7
 is that it allows shuffle objects to get into the GC old generation, which
 triggers more stop-the-world garbage collections.

 Have you tried enabling Tungsten / unsafe?

 Unfortunately, Spark is still not that great at dealing with
 heavily-skewed shuffle data, because its reduce-side aggregation still
 operates on Java objects instead of binary data.

 -Sandy

 On Thu, Aug 20, 2015 at 7:21 AM, Umesh Kacha 
 wrote:

> Hi Sandy thanks much for the response. I am using Spark 1.4.1 and I
> have set spark.shuffle.storage as 0.7 as my spark job involves 4 groupby
> queries executed using hiveContext.sql my data set is skewed so will be
> more shuffling I believe I don't know what's wrong spark job runs fine for
> almost an hour and when shuffle read shuffle write column in UI starts to
> show more than 10 gb executor starts to getting lost because of timeout 
> and
> slowly other executor starts getting lost. Please guide.
> On Aug 20, 2015 7:38 PM, "Sandy Ryza"  wrote:
>
>> What version of Spark are you using?  Have you set any shuffle
>> configs?
>>
>> On Wed, Aug 19, 2015 at 11:46 AM, unk1102 
>> wrote:
>>
>>> I have one Spark job which seems to run fine but after one hour or so
>>> executor start getting lost because of time out something like the
>>> following
>>> error
>>>
>>> cluster.yarnScheduler : Removing an executor 14 65 timeout
>>> exceeds
>>> 60 seconds
>>>
>>> and because of above error couple of chained errors starts to come
>>> like
>>> FetchFailedException, Rpc client disassociated, Connection reset by
>>> peer,
>>> IOException etc
>>>
>>> Please see the following UI page I have noticed when shuffle
>>> read/write
>>> starts to increase more than 10 GB executors starts getting lost
>>> because of
>>> timeout. How do I clear this stacked memory of 10 GB in shuffle
>>> read/write
>>> section I dont cache anything why Spark is not clearing those
>>> memory. Please
>>> guide.
>>>
>>> IMG_20150819_231418358.jpg
>>> <
>>> http://apache-spark-user-list.1001560.n3.nabble.com/file/n24345/IMG_20150819_231418358.jpg
>>> >
>>>
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-avoid-executor-time-out-on-yarn-spark-while-dealing-with-large-shuffle-skewed-data-tp24345.html
>>> Sent from the Apache Spark User List mailing list archive at
>>> Nabble.com.
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>

>>
>


org.apache.hadoop.security.AccessControlException: Permission denied when access S3

2015-08-20 Thread Shuai Zheng
Hi All,

 

I try to access S3 file from S3 in Hadoop file format:

 

Below is my code:

 

 Configuration hadoopConf = ctx.hadoopConfiguration();

 hadoopConf.set("fs.s3n.awsAccessKeyId",
this.getAwsAccessKeyId());

 hadoopConf.set("fs.s3n.awsSecretAccessKey",
this.getAwsSecretAccessKey());

 lines = ctx.newAPIHadoopFile(inputPath,
NonSplitableTextInputFormat.class, LongWritable.class, Text.class,
hadoopConf).values()

  .map(new Function() {

 @Override

 public String call(Text arg0)
throws Exception {

return arg0.toString();

 }

  });

And I have below error:

 

Exception in thread "main"
org.apache.hadoop.security.AccessControlException: Permission denied:
s3n://

at
org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.processException(J
ets3tNativeFileSystemStore.java:449)

at
org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.processException(J
ets3tNativeFileSystemStore.java:427)

 

The permission should not have any problem (because I can use ctx.textFile
without any issue). So the issue from the call: newAPIHadoopFile

 

Anything else I need to setup for this?

 

Regards,

 

Shuai



Re: Creating Spark DataFrame from large pandas DataFrame

2015-08-20 Thread Burak Yavuz
If you would like to try using spark-csv, please use
`pyspark --packages com.databricks:spark-csv_2.11:1.2.0`

You're missing a dependency.

Best,
Burak

On Thu, Aug 20, 2015 at 1:08 PM, Charlie Hack 
wrote:

> Hi,
>
> I'm new to spark and am trying to create a Spark df from a pandas df with
> ~5 million rows. Using Spark 1.4.1.
>
> When I type:
>
> df = sqlContext.createDataFrame(pandas_df.where(pd.notnull(didf), None))
>
> (the df.where is a hack I found on the Spark JIRA to avoid a problem with
> NaN values making mixed column types)
>
> I get:
>
> TypeError: cannot create an RDD from type: 
>
> Converting a smaller pandas dataframe (~2000 rows) works fine. Anyone had
> this issue?
>
>
> This is already a workaround-- ideally I'd like to read the spark
> dataframe from a Hive table. But this is currently not an option for my
> setup.
>
> I also tried reading the data into spark from a CSV using spark-csv.
> Haven't been able to make this work as yet. I launch
>
> $ pyspark --jars path/to/spark-csv_2.11-1.2.0.jar
>
> and when I attempt to read the csv I get:
>
> Py4JJavaError: An error occurred while calling o22.load. :
> java.lang.NoClassDefFoundError: org/apache/commons/csv/CSVFormat ...
>
> Other options I can think of:
>
> - Convert my CSV to json (use Pig?) and read into Spark
> - Read in using jdbc connect from postgres
>
> But want to make sure I'm not misusing Spark or missing something obvious.
>
> Thanks!
>
> Charlie
>


Creating Spark DataFrame from large pandas DataFrame

2015-08-20 Thread Charlie Hack
Hi,

I'm new to spark and am trying to create a Spark df from a pandas df with
~5 million rows. Using Spark 1.4.1.

When I type:

df = sqlContext.createDataFrame(pandas_df.where(pd.notnull(didf), None))

(the df.where is a hack I found on the Spark JIRA to avoid a problem with
NaN values making mixed column types)

I get:

TypeError: cannot create an RDD from type: 

Converting a smaller pandas dataframe (~2000 rows) works fine. Anyone had
this issue?


This is already a workaround-- ideally I'd like to read the spark dataframe
from a Hive table. But this is currently not an option for my setup.

I also tried reading the data into spark from a CSV using spark-csv.
Haven't been able to make this work as yet. I launch

$ pyspark --jars path/to/spark-csv_2.11-1.2.0.jar

and when I attempt to read the csv I get:

Py4JJavaError: An error occurred while calling o22.load. :
java.lang.NoClassDefFoundError: org/apache/commons/csv/CSVFormat ...

Other options I can think of:

- Convert my CSV to json (use Pig?) and read into Spark
- Read in using jdbc connect from postgres

But want to make sure I'm not misusing Spark or missing something obvious.

Thanks!

Charlie


load NULL Values in RDD

2015-08-20 Thread SAHA, DEBOBROTA
Hi ,

Can anyone help me in loading a column that may or may not have NULL values in 
a RDD.


Thanks


FAILED_TO_UNCOMPRESS error from Snappy

2015-08-20 Thread Kohki Nishio
Right after upgraded to 1.4.1, we started seeing this exception and yes we
picked up snappy-java-1.1.1.7 (previously snappy-java-1.1.1.6). Is there
anything I could try ? I don't have a repro case.

org.apache.spark.shuffle.FetchFailedException: FAILED_TO_UNCOMPRESS(5)
at
org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67)
at
org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:84)
at
org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:84)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
at
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at
org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:127)
at
org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:169)
at
org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:168)
at
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:168)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: FAILED_TO_UNCOMPRESS(5)
at org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:84)
at org.xerial.snappy.SnappyNative.rawUncompress(Native Method)
at org.xerial.snappy.Snappy.rawUncompress(Snappy.java:444)
at org.xerial.snappy.Snappy.uncompress(Snappy.java:480)
at
org.xerial.snappy.SnappyInputStream.readFully(SnappyInputStream.java:135)
at
org.xerial.snappy.SnappyInputStream.readHeader(SnappyInputStream.java:92)
at org.xerial.snappy.SnappyInputStream.(SnappyInputStream.java:58)
at
org.apache.spark.io.SnappyCompressionCodec.compressedInputStream(CompressionCodec.scala:160)
at
org.apache.spark.storage.BlockManager.wrapForCompression(BlockManager.scala:1178)
at
org.apache.spark.storage.ShuffleBlockFetcherIterator$$anonfun$4.apply(ShuffleBlockFetcherIterator.scala:301)
at
org.apache.spark.storage.ShuffleBlockFetcherIterator$$anonfun$4.apply(ShuffleBlockFetcherIterator.scala:300)
at scala.util.Success$$anonfun$map$1.apply(Try.scala:206)
at scala.util.Try$.apply(Try.scala:161)
at scala.util.Success.map(Try.scala:206)
at
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:300)
at
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:51)
... 33 more


-- 
Kohki Nishio


Re: Data frame created from hive table and its partition

2015-08-20 Thread VIJAYAKUMAR JAWAHARLAL
Thanks Michael. My bad regarding hive table primary keys.

I have one big 140GB hdfs file and external hive table defined on it. Table is 
not partitioned. When I read external hive table using sqlContext.sql, how does 
spark decides number of partitions which should be created for that data frame?

SparkUI tells me that 1000+ tasks are created to read the above mentioned 
table. I guess one task per hdfs block. Does that mean it creates 1000+ 
partition created for DF? Is there a way to (hash)partition data frame on 
specific key column[s] when I read/load the hive table in spark?

Thanks,
Vijay


> On Aug 20, 2015, at 3:05 PM, Michael Armbrust  wrote:
> 
> There is no such thing as primary keys in the Hive metastore, but Spark SQL 
> does support partitioned hive tables: 
> https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL#LanguageManualDDL-PartitionedTables
>  
> 
> 
> DataFrameWriter also has a partitionBy method.
> 
> On Thu, Aug 20, 2015 at 7:29 AM, VIJAYAKUMAR JAWAHARLAL  > wrote:
> Hi
> 
> I have a question regarding data frame partition. I read a hive table from 
> spark and following spark api converts it as DF.
> 
> test_df = sqlContext.sql(“select * from hivetable1”)
> 
> How does spark decide partition of test_df? Is there a way to partition 
> test_df based on some column while reading hive table? Second question is, if 
> that hive table has primary key declared, does spark honor PK in hive table 
> and partition based on PKs?
> 
> Thanks
> Vijay
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
> 
> For additional commands, e-mail: user-h...@spark.apache.org 
> 
> 
> 



dataframe json schema scan

2015-08-20 Thread Alex Nastetsky
The doc for DataFrameReader#json(RDD[String]) method says

"Unless the schema is specified using schema function, this function goes
through the input once to determine the input schema."

https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrameReader

Why is this necessary? Why can't it create the dataframe at the same time
as it's determining the schema?

Thanks.


Re: Data frame created from hive table and its partition

2015-08-20 Thread Michael Armbrust
There is no such thing as primary keys in the Hive metastore, but Spark SQL
does support partitioned hive tables:
https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL#LanguageManualDDL-PartitionedTables

DataFrameWriter also has a partitionBy method.

On Thu, Aug 20, 2015 at 7:29 AM, VIJAYAKUMAR JAWAHARLAL  wrote:

> Hi
>
> I have a question regarding data frame partition. I read a hive table from
> spark and following spark api converts it as DF.
>
> test_df = sqlContext.sql(“select * from hivetable1”)
>
> How does spark decide partition of test_df? Is there a way to partition
> test_df based on some column while reading hive table? Second question is,
> if that hive table has primary key declared, does spark honor PK in hive
> table and partition based on PKs?
>
> Thanks
> Vijay
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: DataFrameWriter.jdbc is very slow

2015-08-20 Thread Michael Armbrust
We will probably fix this in Spark 1.6

https://issues.apache.org/jira/browse/SPARK-10040

On Thu, Aug 20, 2015 at 5:18 AM, Aram Mkrtchyan  wrote:

> We want to migrate our data (approximately 20M rows) from parquet to postgres,
> when we are using dataframe writer's jdbc method the execution time is very
> large,  we have tried the same with batch insert it was much effective.
> Is it intentionally implemented in that way?
>


Re: How to add a new column with date duration from 2 date columns in a dataframe

2015-08-20 Thread Davies Liu
As Aram said, there two options in Spark 1.4,

1) Use the HiveContext, then you got datediff from Hive,
df.selectExpr("datediff(d2, d1)")
2) Use Python UDF:
```
>>> from datetime import date
>>> df = sqlContext.createDataFrame([(date(2008, 8, 18), date(2008, 9, 26))], 
>>> ['d1', 'd2'])
>>> from pyspark.sql.functions import udf
>>> from pyspark.sql.types import IntegerType
>>> diff = udf(lambda a, b: (a - b).days, IntegerType())
>>> df.select(diff(df.d1, df.d2)).show()
+-+
|PythonUDF#(d1,d2)|
+-+
|  -39|
+-+
```

On Thu, Aug 20, 2015 at 7:45 AM, Aram Mkrtchyan
 wrote:
> Hi,
>
> hope this will help you
>
> import org.apache.spark.sql.functions._
> import sqlContext.implicits._
> import java.sql.Timestamp
>
> val df = sc.parallelize(Array((date1, date2))).toDF("day1", "day2")
>
> val dateDiff = udf[Long, Timestamp, Timestamp]((value1, value2) =>
>   Days.daysBetween(new DateTime(value2.getTime), new
> DateTime(value1.getTime)).getDays)
> df.withColumn("diff", dateDiff(df("day2"), df("day1"))).show()
>
> or you can write sql query using hiveql's datediff function.
>  https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF
>
> On Thu, Aug 20, 2015 at 4:57 PM, Dhaval Patel  wrote:
>>
>> More update on this question..I am using spark 1.4.1.
>>
>> I was just reading documentation of spark 1.5 (still in development) and I
>> think there will be a new func *datediff* that will solve the issue. So
>> please let me know if there is any work-around until spark 1.5 is out :).
>>
>> pyspark.sql.functions.datediff(end, start)[source]
>>
>> Returns the number of days from start to end.
>>
>> >>> df = sqlContext.createDataFrame([('2015-04-08','2015-05-10')], ['d1',
>> >>> 'd2'])
>> >>> df.select(datediff(df.d2, df.d1).alias('diff')).collect()
>> [Row(diff=32)]
>>
>> New in version 1.5.
>>
>>
>> On Thu, Aug 20, 2015 at 8:26 AM, Dhaval Patel 
>> wrote:
>>>
>>> Apologies, sent too early accidentally. Actual message is below
>>> 
>>>
>>> A dataframe has 2 datecolumns (datetime type) and I would like to add
>>> another column that would have difference between these two dates. Dataframe
>>> snippet is below.
>>>
>>> new_df.show(5)
>>> +---+--+--+
>>> | PATID| SVCDATE|next_diag_date|
>>> +---+--+--+
>>> |12345655545|2012-02-13| 2012-02-13|
>>> |12345655545|2012-02-13| 2012-02-13|
>>> |12345655545|2012-02-13| 2012-02-27|
>>> +---+--+--+
>>>
>>>
>>>
>>> Here is what I have tried so far:
>>>
>>> -> new_df.withColumn('SVCDATE2',
>>> (new_df.next_diag_date-new_df.SVCDATE)).show()
>>> Error: DateType does not support numeric operations
>>>
>>> -> new_df.withColumn('SVCDATE2',
>>> (new_df.next_diag_date-new_df.SVCDATE).days).show()
>>> Error: Can't extract value from (next_diag_date#927 - SVCDATE#377);
>>>
>>>
>>> However this simple python code works fine with pySpark:
>>>
>>> from datetime import date
>>> d0 = date(2008, 8, 18)
>>> d1 = date(2008, 9, 26)
>>> delta = d0 - d1
>>> print (d0 - d1).days
>>>
>>> # -39
>>>
>>>
>>> Any suggestions would be appreciated! Also is there a way to add a new
>>> column in dataframe without using column expression (e.g. like in pandas or
>>> R. df$new_col = 'new col value')?
>>>
>>>
>>> Thanks,
>>> Dhaval
>>>
>>>
>>>
>>> On Thu, Aug 20, 2015 at 8:18 AM, Dhaval Patel 
>>> wrote:

 new_df.withColumn('SVCDATE2',
 (new_df.next_diag_date-new_df.SVCDATE).days).show()

 +---+--+--+ | PATID| SVCDATE|next_diag_date|
 +---+--+--+ |12345655545|2012-02-13| 
 2012-02-13|
 |12345655545|2012-02-13| 2012-02-13| |12345655545|2012-02-13| 2012-02-27|
 +---+--+--+
>>>
>>>
>>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to list all dataframes and RDDs available in current session?

2015-08-20 Thread Rishitesh Mishra
I am not sure if you can view all RDDs in a session. Tables are maintained
in a catalogue . Hence its easier. However  you can see the DAG
representation , which lists all the RDDs in a job , with Spark UI.
On 20 Aug 2015 22:34, "Dhaval Patel"  wrote:

> Apologies
>
> I accidentally included Spark User DL on BCC. The actual email message is
> below.
> =
>
>
> Hi:
>
> I have been working on few example using zeppelin.
>
> I have been trying to find a command that would list all *dataframes/RDDs*
> that has been created in current session. Anyone knows if there is any such
> commands available?
>
> Something similar to SparkSQL to list all temp tables :
>   show tables;
>
> Thanks,
> Dhaval
>
>
>
> On Thu, Aug 20, 2015 at 12:49 PM, Dhaval Patel 
> wrote:
>
>> Hi:
>>
>> I have been working on few example using zeppelin.
>>
>> I have been trying to find a command that would list all
>> *dataframes/RDDs* that has been created in current session. Anyone knows if
>> there is any such commands available?
>>
>> Something similar to SparkSQL to list all temp tables :
>>   show tables;
>>
>> Thanks,
>> Dhaval
>>
>
>


Re: DAG related query

2015-08-20 Thread Andrew Or
Hi Bahubali,

Once RDDs are created, they are immutable (in most cases). In your case you
end up with 3 RDDs:

(1) the original rdd1 that reads from the text file
(2) rdd2, that applies a map function on (1), and
(3) the new rdd1 that applies a map function on (2)

There's no cycle because you have 3 distinct RDDs. All you're doing is
reassigning a reference `rdd1`, but the underlying RDD doesn't change.

-Andrew

2015-08-20 6:21 GMT-07:00 Sean Owen :

> No. The third line creates a third RDD whose reference simply replaces
> the reference to the first RDD in your local driver program. The first
> RDD still exists.
>
> On Thu, Aug 20, 2015 at 2:15 PM, Bahubali Jain  wrote:
> > Hi,
> > How would the DAG look like for the below code
> >
> > JavaRDD rdd1 = context.textFile();
> > JavaRDD rdd2 = rdd1.map();
> > rdd1 =  rdd2.map();
> >
> > Does this lead to any kind of cycle?
> >
> > Thanks,
> > Baahu
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: [survey] [spark-ec2] What do you like/dislike about spark-ec2?

2015-08-20 Thread Nicholas Chammas
I'm planning to close the survey to further responses early next week.

If you haven't chimed in yet, the link to the survey is here:

http://goo.gl/forms/erct2s6KRR

We already have some great responses, which you can view. I'll share a
summary after the survey is closed.

Cheers!

Nick


On Mon, Aug 17, 2015 at 11:09 AM Nicholas Chammas <
nicholas.cham...@gmail.com> wrote:

> Howdy folks!
>
> I’m interested in hearing about what people think of spark-ec2
>  outside of the
> formal JIRA process. Your answers will all be anonymous and public.
>
> If the embedded form below doesn’t work for you, you can use this link to
> get the same survey:
>
> http://goo.gl/forms/erct2s6KRR
>
> Cheers!
> Nick
> ​
>


Re: Windowed stream operations -- These are too lazy for some use cases

2015-08-20 Thread Justin Grimes
I tried something like that. When I tried just doing count() on the
DStream, it didn't seem like it was actually forcing the computation.

What (sort of) worked was doing a forEachRDD((rdd) => rdd.count()), or
doing a print() on the DStream. The only problem was this seemed to add a
lot of processing overhead -- I couldn't figure out exactly why but it
seemed to have something to do with forEachRDD only being executed on the
driver.

On Thu, Aug 20, 2015 at 1:39 PM, Iulian Dragoș 
wrote:

> On Thu, Aug 20, 2015 at 6:58 PM, Justin Grimes  wrote:
>
> We are aggregating real time logs of events, and want to do windows of 30
>> minutes. However, since the computation doesn't start until 30 minutes have
>> passed, there is a ton of data built up that processing could've already
>> started on. When it comes time to actually process the data, there is too
>> much for our cluster to handle at once.
>>
>> The basic idea is this:
>>
>>  val mergedMain = mergedStream
>>   .flatMap(r => ) // denormalize data for this particular output
>> stream
>>   .reduceByKey((x:Array[Int],y:Array[Int]) => sumAggregates(x,y)) //
>> this would sum over the batches
>>
> Could you add a dummy action at this point?
>
> val firstStep = mergedStream
>   .flatMap(r => ) // denormalize data for this particular output 
> stream
>   .reduceByKey((x:Array[Int],y:Array[Int]) => sumAggregates(x,y)) // this 
> would sum over the batches
>   .persist() // this will be reused in windowing operations
>
> firstStep.count() // just to trigger computation
>
> firstStep
>   .reduceByKeyAndWindow((x:Array[Int],y:Array[Int]) => 
> sumAggregates(x,y), 180, 180) // sum over the windows
>   .map(rec => ...) // convert data to other format
>   .foreachRDD{ (rdd, time) =>
> rdd.saveAsTextFile(...) // save to text files
>   }
>
>   .reduceByKeyAndWindow((x:Array[Int],y:Array[Int]) =>
>> sumAggregates(x,y), 180, 180) // sum over the windows
>>   .map(rec => ...) // convert data to other format
>>   .foreachRDD{ (rdd, time) =>
>> rdd.saveAsTextFile(...) // save to text files
>>   }
>>
>> I would want the batches to be reduced as soon as they arrive (the first
>> reduceByKey), since there isn't any reason to wait. Instead all of the
>> unprocessed data has to be processed at the same time (this data is being
>> heavily denormalized in some cases, and so generates a bunch of additional
>> data).
>>
>> Thanks for any help.
>>
> ​
> --
>
> --
> Iulian Dragos
>
> --
> Reactive Apps on the JVM
> www.typesafe.com
>
>


Re: Windowed stream operations -- These are too lazy for some use cases

2015-08-20 Thread Iulian Dragoș
On Thu, Aug 20, 2015 at 6:58 PM, Justin Grimes  wrote:

We are aggregating real time logs of events, and want to do windows of 30
> minutes. However, since the computation doesn't start until 30 minutes have
> passed, there is a ton of data built up that processing could've already
> started on. When it comes time to actually process the data, there is too
> much for our cluster to handle at once.
>
> The basic idea is this:
>
>  val mergedMain = mergedStream
>   .flatMap(r => ) // denormalize data for this particular output
> stream
>   .reduceByKey((x:Array[Int],y:Array[Int]) => sumAggregates(x,y)) //
> this would sum over the batches
>
Could you add a dummy action at this point?

val firstStep = mergedStream
  .flatMap(r => ) // denormalize data for this particular output stream
  .reduceByKey((x:Array[Int],y:Array[Int]) => sumAggregates(x,y))
// this would sum over the batches
  .persist() // this will be reused in windowing operations

firstStep.count() // just to trigger computation

firstStep
  .reduceByKeyAndWindow((x:Array[Int],y:Array[Int]) =>
sumAggregates(x,y), 180, 180) // sum over the windows
  .map(rec => ...) // convert data to other format
  .foreachRDD{ (rdd, time) =>
rdd.saveAsTextFile(...) // save to text files
  }

  .reduceByKeyAndWindow((x:Array[Int],y:Array[Int]) =>
> sumAggregates(x,y), 180, 180) // sum over the windows
>   .map(rec => ...) // convert data to other format
>   .foreachRDD{ (rdd, time) =>
> rdd.saveAsTextFile(...) // save to text files
>   }
>
> I would want the batches to be reduced as soon as they arrive (the first
> reduceByKey), since there isn't any reason to wait. Instead all of the
> unprocessed data has to be processed at the same time (this data is being
> heavily denormalized in some cases, and so generates a bunch of additional
> data).
>
> Thanks for any help.
>
​
-- 

--
Iulian Dragos

--
Reactive Apps on the JVM
www.typesafe.com


Re: Run scala code with spark submit

2015-08-20 Thread Dean Wampler
I haven't tried it, but scala-shell should work if you give it a scala
"script" file, since it's basically a wrapper around the Scala REPL.

dean

On Thursday, August 20, 2015, MasterSergius 
wrote:

> Is there any possibility to run standalone scala program via spark submit?
> Or
> I have always put it in some packages, build it with maven (or sbt)?
>
> What if I have just simple program, like that example word counter?
> Could anyone please, show it on this simple test file Greeting.scala:
>
>
>
> It comiles with scalac, runs with scala. Now I want to run in with spark (I
> can get these files via wget, for example)
>
>
>
>
>
> So, how I can run via spark-submit one-filer scala program?
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Run-scala-code-with-spark-submit-tp24367.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
> For additional commands, e-mail: user-h...@spark.apache.org 
>
>

-- 
Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
 (O'Reilly)
Typesafe 
@deanwampler 
http://polyglotprogramming.com


Run scala code with spark submit

2015-08-20 Thread MasterSergius
Is there any possibility to run standalone scala program via spark submit? Or
I have always put it in some packages, build it with maven (or sbt)?

What if I have just simple program, like that example word counter? 
Could anyone please, show it on this simple test file Greeting.scala:



It comiles with scalac, runs with scala. Now I want to run in with spark (I
can get these files via wget, for example)





So, how I can run via spark-submit one-filer scala program?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Run-scala-code-with-spark-submit-tp24367.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to list all dataframes and RDDs available in current session?

2015-08-20 Thread Dhaval Patel
Apologies

I accidentally included Spark User DL on BCC. The actual email message is
below.
=


Hi:

I have been working on few example using zeppelin.

I have been trying to find a command that would list all *dataframes/RDDs*
that has been created in current session. Anyone knows if there is any such
commands available?

Something similar to SparkSQL to list all temp tables :
  show tables;

Thanks,
Dhaval



On Thu, Aug 20, 2015 at 12:49 PM, Dhaval Patel  wrote:

> Hi:
>
> I have been working on few example using zeppelin.
>
> I have been trying to find a command that would list all *dataframes/RDDs*
> that has been created in current session. Anyone knows if there is any such
> commands available?
>
> Something similar to SparkSQL to list all temp tables :
>   show tables;
>
> Thanks,
> Dhaval
>


Windowed stream operations -- These are too lazy for some use cases

2015-08-20 Thread Justin Grimes
We are aggregating real time logs of events, and want to do windows of 30
minutes. However, since the computation doesn't start until 30 minutes have
passed, there is a ton of data built up that processing could've already
started on. When it comes time to actually process the data, there is too
much for our cluster to handle at once.

The basic idea is this:

 val mergedMain = mergedStream
  .flatMap(r => ) // denormalize data for this particular output
stream
  .reduceByKey((x:Array[Int],y:Array[Int]) => sumAggregates(x,y)) //
this would sum over the batches
  .reduceByKeyAndWindow((x:Array[Int],y:Array[Int]) =>
sumAggregates(x,y), 180, 180) // sum over the windows
  .map(rec => ...) // convert data to other format
  .foreachRDD{ (rdd, time) =>
rdd.saveAsTextFile(...) // save to text files
  }

I would want the batches to be reduced as soon as they arrive (the first
reduceByKey), since there isn't any reason to wait. Instead all of the
unprocessed data has to be processed at the same time (this data is being
heavily denormalized in some cases, and so generates a bunch of additional
data).

Thanks for any help.


How to list all dataframes and RDDs available in current session?

2015-08-20 Thread Dhaval Patel
Hi:

I have been working on few example using zeppelin.

I have been trying to find a command that would list all *dataframes/RDDs*
that has been created in current session. Anyone knows if there is any such
commands available?

Something similar to SparkSQL to list all temp tables :
  show tables;

Thanks,
Dhaval


Re: How to get the radius of clusters in spark K means

2015-08-20 Thread Ashen Weerathunga
Okay. Thanks.
I already did that and wanted to check whether is there any other method to
extract it from the model itself. Thanks again for the help.

On Thu, Aug 20, 2015 at 8:39 PM, Robin East  wrote:

> There is no cluster radius method on the model returned from K-means.
> You’ll need to roll it yourself by generating the distance from each point
> in the cluster to the cluster center itself and then take the max.
>
>
> ---
> Robin East
> *Spark GraphX in Action* Michael Malak and Robin East
> Manning Publications Co.
> http://www.manning.com/malak/
>
> On 20 Aug 2015, at 07:14, ashensw  wrote:
>
> We can get cluster centers in K means clustering. Like wise is there any
> method in spark to get the cluster radius?
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-get-the-radius-of-clusters-in-spark-K-means-tp24353.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>
>


-- 
*Ashen Weerathunga*
Software Engineer - Intern
WSO2 Inc.: http://wso2.com
lean.enterprise.middleware

Email: as...@wso2.com
Mobile: +94 716042995 <94716042995>
LinkedIn:
*http://lk.linkedin.com/in/ashenweerathunga
*


Re: SparkSQL concerning materials

2015-08-20 Thread Dhaval Patel
Or if you're a python lover then this is a good place -
https://spark.apache.org/docs/1.4.1/api/python/pyspark.sql.html#



On Thu, Aug 20, 2015 at 10:58 AM, Ted Yu  wrote:

> See also
> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.package
>
> Cheers
>
> On Thu, Aug 20, 2015 at 7:50 AM, Muhammad Atif 
> wrote:
>
>> Hi Dawid
>>
>> The best pace to get started is the Spark SQL Guide from Apache
>> http://spark.apache.org/docs/latest/sql-programming-guide.html
>>
>> Regards
>> Muhammad
>>
>> On Thu, Aug 20, 2015 at 5:46 AM, Dawid Wysakowicz <
>> wysakowicz.da...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I would like to dip into SparkSQL. Get to know better the architecture,
>>> good practices, some internals. Could you advise me some materials on this
>>> matter?
>>>
>>> Regards
>>> Dawid
>>>
>>
>>
>


MLlib Prefixspan implementation

2015-08-20 Thread alexis GILLAIN
I want to use prefixspan so I had a look at the code and the cited paper :
"Distributed PrefixSpan Algorithm Based on MapReduce".

There is a result in the paper I didn't really undertstand and I could'nt
find where it is used in the code.

Suppose a sequence database S = {­1­,2...­n}, a sequence  is a
length-(L-1) (2≤L≤n) sequential pattern, in projected databases which is a
prefix of a length-(L-1) sequential pattern , when the support count
of  is not less than min_support, it is equal to obtaining a length-L
sequential pattern < a ... a > from projected databases that obtaining a
length-L sequential pattern < a ... a > from a sequence database S.

According to the paper It's supposed to add a pruning step in the reduce
function but I couldn't find where.

This result seems to come from a previous paper : "Wang Linlin, Fan Jun.
Improved Algorithm for Sequential Pattern Mining Based on PrefixSpan [J].
Computer Engineering, 2009, 35(23): 56-61" but it didn't help me to
understand it and how it can improve the algorithm.


Re: How to avoid executor time out on yarn spark while dealing with large shuffle skewed data?

2015-08-20 Thread Umesh Kacha
Hi where do I see GC time in UI? I have set spark.yarn.executor.memoryOverhead
as 3500 which seems to be good enough I believe. So you mean only GC could
be the reason behind timeout I checked Yarn logs I did not see any GC error
there. Please guide. Thanks much.

On Thu, Aug 20, 2015 at 8:14 PM, Sandy Ryza  wrote:

> Moving this back onto user@
>
> Regarding GC, can you look in the web UI and see whether the "GC time"
> metric dominates the amount of time spent on each task (or at least the
> tasks that aren't completing)?
>
> Also, have you tried bumping your spark.yarn.executor.memoryOverhead?
> YARN may be killing your executors for using too much off-heap space.  You
> can see whether this is happening by looking in the Spark AM or YARN
> NodeManager logs.
>
> -Sandy
>
> On Thu, Aug 20, 2015 at 7:39 AM, Umesh Kacha 
> wrote:
>
>> Hi thanks much for the response. Yes I tried default settings too 0.2 it
>> was also going into timeout if it is spending time in GC then why it is not
>> throwing GC error I don't see any such error. Yarn logs are not helpful at
>> all. What is tungsten how do I use it? Spark is doing great I believe my
>> job runs successfully and 60% tasks completes only after first executor
>> gets lost things are messing.
>> On Aug 20, 2015 7:59 PM, "Sandy Ryza"  wrote:
>>
>>> What sounds most likely is that you're hitting heavy garbage
>>> collection.  Did you hit issues when the shuffle memory fraction was at its
>>> default of 0.2?  A potential danger with setting the shuffle storage to 0.7
>>> is that it allows shuffle objects to get into the GC old generation, which
>>> triggers more stop-the-world garbage collections.
>>>
>>> Have you tried enabling Tungsten / unsafe?
>>>
>>> Unfortunately, Spark is still not that great at dealing with
>>> heavily-skewed shuffle data, because its reduce-side aggregation still
>>> operates on Java objects instead of binary data.
>>>
>>> -Sandy
>>>
>>> On Thu, Aug 20, 2015 at 7:21 AM, Umesh Kacha 
>>> wrote:
>>>
 Hi Sandy thanks much for the response. I am using Spark 1.4.1 and I
 have set spark.shuffle.storage as 0.7 as my spark job involves 4 groupby
 queries executed using hiveContext.sql my data set is skewed so will be
 more shuffling I believe I don't know what's wrong spark job runs fine for
 almost an hour and when shuffle read shuffle write column in UI starts to
 show more than 10 gb executor starts to getting lost because of timeout and
 slowly other executor starts getting lost. Please guide.
 On Aug 20, 2015 7:38 PM, "Sandy Ryza"  wrote:

> What version of Spark are you using?  Have you set any shuffle configs?
>
> On Wed, Aug 19, 2015 at 11:46 AM, unk1102 
> wrote:
>
>> I have one Spark job which seems to run fine but after one hour or so
>> executor start getting lost because of time out something like the
>> following
>> error
>>
>> cluster.yarnScheduler : Removing an executor 14 65 timeout exceeds
>> 60 seconds
>>
>> and because of above error couple of chained errors starts to come
>> like
>> FetchFailedException, Rpc client disassociated, Connection reset by
>> peer,
>> IOException etc
>>
>> Please see the following UI page I have noticed when shuffle
>> read/write
>> starts to increase more than 10 GB executors starts getting lost
>> because of
>> timeout. How do I clear this stacked memory of 10 GB in shuffle
>> read/write
>> section I dont cache anything why Spark is not clearing those memory.
>> Please
>> guide.
>>
>> IMG_20150819_231418358.jpg
>> <
>> http://apache-spark-user-list.1001560.n3.nabble.com/file/n24345/IMG_20150819_231418358.jpg
>> >
>>
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-avoid-executor-time-out-on-yarn-spark-while-dealing-with-large-shuffle-skewed-data-tp24345.html
>> Sent from the Apache Spark User List mailing list archive at
>> Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>
>>>
>


Re: SparkSQL concerning materials

2015-08-20 Thread Ted Yu
See also
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.package

Cheers

On Thu, Aug 20, 2015 at 7:50 AM, Muhammad Atif 
wrote:

> Hi Dawid
>
> The best pace to get started is the Spark SQL Guide from Apache
> http://spark.apache.org/docs/latest/sql-programming-guide.html
>
> Regards
> Muhammad
>
> On Thu, Aug 20, 2015 at 5:46 AM, Dawid Wysakowicz <
> wysakowicz.da...@gmail.com> wrote:
>
>> Hi,
>>
>> I would like to dip into SparkSQL. Get to know better the architecture,
>> good practices, some internals. Could you advise me some materials on this
>> matter?
>>
>> Regards
>> Dawid
>>
>
>


Re: SparkSQL concerning materials

2015-08-20 Thread Muhammad Atif
Hi Dawid

The best pace to get started is the Spark SQL Guide from Apache
http://spark.apache.org/docs/latest/sql-programming-guide.html

Regards
Muhammad

On Thu, Aug 20, 2015 at 5:46 AM, Dawid Wysakowicz <
wysakowicz.da...@gmail.com> wrote:

> Hi,
>
> I would like to dip into SparkSQL. Get to know better the architecture,
> good practices, some internals. Could you advise me some materials on this
> matter?
>
> Regards
> Dawid
>


Re: How to add a new column with date duration from 2 date columns in a dataframe

2015-08-20 Thread Aram Mkrtchyan
Hi,

hope this will help you

import org.apache.spark.sql.functions._
import sqlContext.implicits._
import java.sql.Timestamp

val df = sc.parallelize(Array((date1, date2))).toDF("day1", "day2")

val dateDiff = udf[Long, Timestamp, Timestamp]((value1, value2) =>
  Days.daysBetween(new DateTime(value2.getTime), new
DateTime(value1.getTime)).getDays)
df.withColumn("diff", dateDiff(df("day2"), df("day1"))).show()

or you can write sql query using hiveql's datediff function.
 https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF

On Thu, Aug 20, 2015 at 4:57 PM, Dhaval Patel  wrote:

> More update on this question..I am using spark 1.4.1.
>
> I was just reading documentation of spark 1.5 (still in development) and I
> think there will be a new func *datediff* that will solve the issue. So
> please let me know if there is any work-around until spark 1.5 is out :).
>
> pyspark.sql.functions.datediff(*end*, *start*)[source]
> 
> 
>
> Returns the number of days from start to end.
>
> >>> df = sqlContext.createDataFrame([('2015-04-08','2015-05-10')], ['d1', 
> >>> 'd2'])>>> df.select(datediff(df.d2, 
> >>> df.d1).alias('diff')).collect()[Row(diff=32)]
>
> New in version 1.5.
>
> On Thu, Aug 20, 2015 at 8:26 AM, Dhaval Patel 
> wrote:
>
>> Apologies, sent too early accidentally. Actual message is below
>> 
>>
>> A dataframe has 2 datecolumns (datetime type) and I would like to add
>> another column that would have difference between these two dates.
>> Dataframe snippet is below.
>>
>> new_df.show(5)
>> +---+--+--+
>> | PATID| SVCDATE|next_diag_date|
>> +---+--+--+
>> |12345655545|2012-02-13| 2012-02-13|
>> |12345655545|2012-02-13| 2012-02-13|
>> |12345655545|2012-02-13| 2012-02-27|
>> +---+--+--+
>>
>>
>>
>> Here is what I have tried so far:
>>
>> -> new_df.withColumn('SVCDATE2',
>> (new_df.next_diag_date-new_df.SVCDATE)).show()
>> Error: DateType does not support numeric operations
>>
>> -> new_df.withColumn('SVCDATE2',
>> (new_df.next_diag_date-new_df.SVCDATE).days).show()
>> Error: Can't extract value from (next_diag_date#927 - SVCDATE#377);
>>
>>
>> However this simple python code works fine with pySpark:
>>
>> from datetime import date
>> d0 = date(2008, 8, 18)
>> d1 = date(2008, 9, 26)
>> delta = d0 - d1
>> print (d0 - d1).days
>>
>> # -39
>>
>>
>> Any suggestions would be appreciated! Also is there a way to add a new
>> column in dataframe without using column expression (e.g. like in pandas or
>> R. df$new_col = 'new col value')?
>>
>>
>> Thanks,
>> Dhaval
>>
>>
>>
>> On Thu, Aug 20, 2015 at 8:18 AM, Dhaval Patel 
>> wrote:
>>
>>> new_df.withColumn('SVCDATE2',
>>> (new_df.next_diag_date-new_df.SVCDATE).days).show()
>>>
>>> +---+--+--+ | PATID| SVCDATE|next_diag_date|
>>> +---+--+--+ |12345655545|2012-02-13|
>>> 2012-02-13| |12345655545|2012-02-13| 2012-02-13| |12345655545|2012-02-13|
>>> 2012-02-27| +---+--+--+
>>>
>>
>>
>


Re: How to avoid executor time out on yarn spark while dealing with large shuffle skewed data?

2015-08-20 Thread Sandy Ryza
Moving this back onto user@

Regarding GC, can you look in the web UI and see whether the "GC time"
metric dominates the amount of time spent on each task (or at least the
tasks that aren't completing)?

Also, have you tried bumping your spark.yarn.executor.memoryOverhead?  YARN
may be killing your executors for using too much off-heap space.  You can
see whether this is happening by looking in the Spark AM or YARN
NodeManager logs.

-Sandy

On Thu, Aug 20, 2015 at 7:39 AM, Umesh Kacha  wrote:

> Hi thanks much for the response. Yes I tried default settings too 0.2 it
> was also going into timeout if it is spending time in GC then why it is not
> throwing GC error I don't see any such error. Yarn logs are not helpful at
> all. What is tungsten how do I use it? Spark is doing great I believe my
> job runs successfully and 60% tasks completes only after first executor
> gets lost things are messing.
> On Aug 20, 2015 7:59 PM, "Sandy Ryza"  wrote:
>
>> What sounds most likely is that you're hitting heavy garbage collection.
>> Did you hit issues when the shuffle memory fraction was at its default of
>> 0.2?  A potential danger with setting the shuffle storage to 0.7 is that it
>> allows shuffle objects to get into the GC old generation, which triggers
>> more stop-the-world garbage collections.
>>
>> Have you tried enabling Tungsten / unsafe?
>>
>> Unfortunately, Spark is still not that great at dealing with
>> heavily-skewed shuffle data, because its reduce-side aggregation still
>> operates on Java objects instead of binary data.
>>
>> -Sandy
>>
>> On Thu, Aug 20, 2015 at 7:21 AM, Umesh Kacha 
>> wrote:
>>
>>> Hi Sandy thanks much for the response. I am using Spark 1.4.1 and I have
>>> set spark.shuffle.storage as 0.7 as my spark job involves 4 groupby queries
>>> executed using hiveContext.sql my data set is skewed so will be more
>>> shuffling I believe I don't know what's wrong spark job runs fine for
>>> almost an hour and when shuffle read shuffle write column in UI starts to
>>> show more than 10 gb executor starts to getting lost because of timeout and
>>> slowly other executor starts getting lost. Please guide.
>>> On Aug 20, 2015 7:38 PM, "Sandy Ryza"  wrote:
>>>
 What version of Spark are you using?  Have you set any shuffle configs?

 On Wed, Aug 19, 2015 at 11:46 AM, unk1102 
 wrote:

> I have one Spark job which seems to run fine but after one hour or so
> executor start getting lost because of time out something like the
> following
> error
>
> cluster.yarnScheduler : Removing an executor 14 65 timeout exceeds
> 60 seconds
>
> and because of above error couple of chained errors starts to come like
> FetchFailedException, Rpc client disassociated, Connection reset by
> peer,
> IOException etc
>
> Please see the following UI page I have noticed when shuffle read/write
> starts to increase more than 10 GB executors starts getting lost
> because of
> timeout. How do I clear this stacked memory of 10 GB in shuffle
> read/write
> section I dont cache anything why Spark is not clearing those memory.
> Please
> guide.
>
> IMG_20150819_231418358.jpg
> <
> http://apache-spark-user-list.1001560.n3.nabble.com/file/n24345/IMG_20150819_231418358.jpg
> >
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-avoid-executor-time-out-on-yarn-spark-while-dealing-with-large-shuffle-skewed-data-tp24345.html
> Sent from the Apache Spark User List mailing list archive at
> Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>

>>


Re: Transformation not happening for reduceByKey or GroupByKey

2015-08-20 Thread satish chandra j
HI All,
Could anybody let me know what is that i missing here, it should work as
its a basic transformation

Please let me know if any additional information required

Regards,
Satish

On Thu, Aug 20, 2015 at 3:35 PM, satish chandra j 
wrote:

> HI All,
> I have data in RDD as mentioned below:
>
> RDD : Array[(Int),(Int)] = Array((0,1), (0,2),(1,20),(1,30),(2,40))
>
>
> I am expecting output as Array((0,3),(1,50),(2,40)) just a sum function on
> Values for each key
>
> Code:
> RDD.reduceByKey((x,y) => x+y)
> RDD.take(3)
>
> Result in console:
> RDD: org.apache.spark.rdd.RDD[(Int,Int)]= ShuffledRDD[1] at reduceByKey at
> :73
> res:Array[(Int,Int)] = Array()
>
> Command as mentioned
>
> dse spark --master local --jars postgresql-9.4-1201.jar -i  
>
>
> Please let me know what is missing in my code, as my resultant Array is
> empty
>
>
>
> Regards,
> Satish
>
>


Data frame created from hive table and its partition

2015-08-20 Thread VIJAYAKUMAR JAWAHARLAL
Hi 

I have a question regarding data frame partition. I read a hive table from 
spark and following spark api converts it as DF.

test_df = sqlContext.sql(“select * from hivetable1”)

How does spark decide partition of test_df? Is there a way to partition test_df 
based on some column while reading hive table? Second question is, if that hive 
table has primary key declared, does spark honor PK in hive table and partition 
based on PKs?

Thanks
Vijay
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: spark.sql.shuffle.partitions=1 seems to be working fine but creates timeout for large skewed data

2015-08-20 Thread Umesh Kacha
Hi Hemant sorry for the confusion I meant final output part files in the
final directory hdfs I never meant intermediate files. Thanks. My goal is
to reduce those many files because of my use case explained in the first
email with calculations.
On Aug 20, 2015 5:59 PM, "Hemant Bhanawat"  wrote:

> Sorry, I misread your mail. Thanks for pointing that out.
>
> BTW, are the 8 files shuffle intermediate output and not the final
> output? I assume yes. I didn't know that you can keep intermediate output
> on HDFS and I don't think that is recommended.
>
>
>
>
> On Thu, Aug 20, 2015 at 2:43 PM, Hemant Bhanawat 
> wrote:
>
>> Looks like you are using hash based shuffling and not sort based
>> shuffling which creates a single file per maptask.
>>
>> On Thu, Aug 20, 2015 at 12:43 AM, unk1102  wrote:
>>
>>> Hi I have a Spark job which deals with large skewed dataset. I have
>>> around
>>> 1000 Hive partitions to process in four different tables every day. So
>>> if I
>>> go with 200 spark.sql.shuffle.partitions default partitions created by
>>> Spark
>>> I end up with 4 * 1000 * 200 = 8 small small files in HDFS which
>>> wont be
>>> good for HDFS name node I have been told if you keep on creating such
>>> large
>>> no of small small files namenode will crash is it true? please help me
>>> understand. Anyways so to avoid creating small files I did set
>>> spark.sql.shuffle.partitions=1 it seems to be creating 1 output file and
>>> as
>>> per my understanding because of only one output there is so much
>>> shuffling
>>> to do to bring all data to once reducer please correct me if I am wrong.
>>> This is causing memory/timeout issues how do I deal with it
>>>
>>> I tried to give spark.shuffle.storage=0.7 also still this memory seems
>>> not
>>> enough for it. I have 25 gb executor with 4 cores and 20 such executors
>>> still Spark job fails please guide.
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/spark-sql-shuffle-partitions-1-seems-to-be-working-fine-but-creates-timeout-for-large-skewed-data-tp24346.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>
>


Re: Convert mllib.linalg.Matrix to Breeze

2015-08-20 Thread Burak Yavuz
Matrix.toBreeze is a private method. MLlib matrices have the same structure
as Breeze Matrices. Just create a new Breeze matrix like this

.

Best,
Burak


On Thu, Aug 20, 2015 at 3:28 AM, Yanbo Liang  wrote:

> You can use Matrix.toBreeze()
> 
>  .
>
> 2015-08-20 18:24 GMT+08:00 Naveen :
>
>> Hi All,
>>
>> Is there anyway to convert a mllib matrix to a Dense Matrix of Breeze?
>> Any leads are appreciated.
>>
>>
>> Thanks,
>> Naveen
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: How to avoid executor time out on yarn spark while dealing with large shuffle skewed data?

2015-08-20 Thread Sandy Ryza
What version of Spark are you using?  Have you set any shuffle configs?

On Wed, Aug 19, 2015 at 11:46 AM, unk1102  wrote:

> I have one Spark job which seems to run fine but after one hour or so
> executor start getting lost because of time out something like the
> following
> error
>
> cluster.yarnScheduler : Removing an executor 14 65 timeout exceeds
> 60 seconds
>
> and because of above error couple of chained errors starts to come like
> FetchFailedException, Rpc client disassociated, Connection reset by peer,
> IOException etc
>
> Please see the following UI page I have noticed when shuffle read/write
> starts to increase more than 10 GB executors starts getting lost because of
> timeout. How do I clear this stacked memory of 10 GB in shuffle read/write
> section I dont cache anything why Spark is not clearing those memory.
> Please
> guide.
>
> IMG_20150819_231418358.jpg
> <
> http://apache-spark-user-list.1001560.n3.nabble.com/file/n24345/IMG_20150819_231418358.jpg
> >
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-avoid-executor-time-out-on-yarn-spark-while-dealing-with-large-shuffle-skewed-data-tp24345.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


PySpark concurrent jobs using single SparkContext

2015-08-20 Thread Mike Sukmanowsky
Hi all,

We're using Spark 1.3.0 via a small YARN cluster to do some log processing.
The jobs are pretty simple, for a number of customers and a number of days,
fetch some event log data, build aggregates and store those aggregates into
a data store.

The way our script is written right now does something akin to:

with SparkContext() as sc:
for customer in customers:
for day in days:
logs = sc.textFile(get_logs(customer, day))
aggregate = make_aggregate(logs)
# This function contains the action saveAsNewAPIHadoopFile which
# triggers a save
save_aggregate(aggregate)

​
So we have a Spark job per customer, per day.

I tried doing some parallel job submission with something similar to:

def make_and_save_aggregate(customer, day, spark_context):
# Without a separate threading.Lock() here or better yet, one guarding the
# Spark context, multiple customer/day transformations and actions could
# be interweaved
sc = spark_context
logs = sc.textFile(get_logs(customer, day))
aggregate = make_aggregate(logs)
save_aggregate(aggregate)
with SparkContext() as sc, futures.ThreadPoolExecutor(4) as executor:
for customer in customers:
for day in days:
executor.submit(make_and_save_aggregate, customer, day, sc)

​
The problem is, with no locks on a SparkContext except during initialization

and
shutdown
,
operations on the context could (if I understand correctly) be interweaved
leading to DAG which contains transformations out of order and from
different customer, day periods.

One solution is instead to launch multiple Spark jobs via spark-submit and
let YARN/Spark's dynamic executor allocation take care of fair scheduling.
In practice, this doesn't seem to yield very fast computation perhaps due
to some additional overhead with YARN.

Is there any safe way to launch concurrent jobs like this using a single
PySpark context?

-- 
Mike Sukmanowsky
Aspiring Digital Carpenter

*e*: mike.sukmanow...@gmail.com

LinkedIn  | github



Re: DAG related query

2015-08-20 Thread Sean Owen
No. The third line creates a third RDD whose reference simply replaces
the reference to the first RDD in your local driver program. The first
RDD still exists.

On Thu, Aug 20, 2015 at 2:15 PM, Bahubali Jain  wrote:
> Hi,
> How would the DAG look like for the below code
>
> JavaRDD rdd1 = context.textFile();
> JavaRDD rdd2 = rdd1.map();
> rdd1 =  rdd2.map();
>
> Does this lead to any kind of cycle?
>
> Thanks,
> Baahu

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



DAG related query

2015-08-20 Thread Bahubali Jain
Hi,
How would the DAG look like for the below code

JavaRDD rdd1 = context.textFile();
JavaRDD rdd2 = rdd1.map();
rdd1 =  rdd2.map();

Does this lead to any kind of cycle?

Thanks,
Baahu


Re: How to add a new column with date duration from 2 date columns in a dataframe

2015-08-20 Thread Dhaval Patel
More update on this question..I am using spark 1.4.1.

I was just reading documentation of spark 1.5 (still in development) and I
think there will be a new func *datediff* that will solve the issue. So
please let me know if there is any work-around until spark 1.5 is out :).

pyspark.sql.functions.datediff(*end*, *start*)[source]



Returns the number of days from start to end.

>>> df = sqlContext.createDataFrame([('2015-04-08','2015-05-10')], ['d1', 
>>> 'd2'])>>> df.select(datediff(df.d2, 
>>> df.d1).alias('diff')).collect()[Row(diff=32)]

New in version 1.5.

On Thu, Aug 20, 2015 at 8:26 AM, Dhaval Patel  wrote:

> Apologies, sent too early accidentally. Actual message is below
> 
>
> A dataframe has 2 datecolumns (datetime type) and I would like to add
> another column that would have difference between these two dates.
> Dataframe snippet is below.
>
> new_df.show(5)
> +---+--+--+
> | PATID| SVCDATE|next_diag_date|
> +---+--+--+
> |12345655545|2012-02-13| 2012-02-13|
> |12345655545|2012-02-13| 2012-02-13|
> |12345655545|2012-02-13| 2012-02-27|
> +---+--+--+
>
>
>
> Here is what I have tried so far:
>
> -> new_df.withColumn('SVCDATE2',
> (new_df.next_diag_date-new_df.SVCDATE)).show()
> Error: DateType does not support numeric operations
>
> -> new_df.withColumn('SVCDATE2',
> (new_df.next_diag_date-new_df.SVCDATE).days).show()
> Error: Can't extract value from (next_diag_date#927 - SVCDATE#377);
>
>
> However this simple python code works fine with pySpark:
>
> from datetime import date
> d0 = date(2008, 8, 18)
> d1 = date(2008, 9, 26)
> delta = d0 - d1
> print (d0 - d1).days
>
> # -39
>
>
> Any suggestions would be appreciated! Also is there a way to add a new
> column in dataframe without using column expression (e.g. like in pandas or
> R. df$new_col = 'new col value')?
>
>
> Thanks,
> Dhaval
>
>
>
> On Thu, Aug 20, 2015 at 8:18 AM, Dhaval Patel 
> wrote:
>
>> new_df.withColumn('SVCDATE2',
>> (new_df.next_diag_date-new_df.SVCDATE).days).show()
>>
>> +---+--+--+ | PATID| SVCDATE|next_diag_date|
>> +---+--+--+ |12345655545|2012-02-13|
>> 2012-02-13| |12345655545|2012-02-13| 2012-02-13| |12345655545|2012-02-13|
>> 2012-02-27| +---+--+--+
>>
>
>


Re: spark.sql.shuffle.partitions=1 seems to be working fine but creates timeout for large skewed data

2015-08-20 Thread Hemant Bhanawat
Sorry, I misread your mail. Thanks for pointing that out.

BTW, are the 8 files shuffle intermediate output and not the final
output? I assume yes. I didn't know that you can keep intermediate output
on HDFS and I don't think that is recommended.




On Thu, Aug 20, 2015 at 2:43 PM, Hemant Bhanawat 
wrote:

> Looks like you are using hash based shuffling and not sort based shuffling
> which creates a single file per maptask.
>
> On Thu, Aug 20, 2015 at 12:43 AM, unk1102  wrote:
>
>> Hi I have a Spark job which deals with large skewed dataset. I have around
>> 1000 Hive partitions to process in four different tables every day. So if
>> I
>> go with 200 spark.sql.shuffle.partitions default partitions created by
>> Spark
>> I end up with 4 * 1000 * 200 = 8 small small files in HDFS which wont
>> be
>> good for HDFS name node I have been told if you keep on creating such
>> large
>> no of small small files namenode will crash is it true? please help me
>> understand. Anyways so to avoid creating small files I did set
>> spark.sql.shuffle.partitions=1 it seems to be creating 1 output file and
>> as
>> per my understanding because of only one output there is so much shuffling
>> to do to bring all data to once reducer please correct me if I am wrong.
>> This is causing memory/timeout issues how do I deal with it
>>
>> I tried to give spark.shuffle.storage=0.7 also still this memory seems not
>> enough for it. I have 25 gb executor with 4 cores and 20 such executors
>> still Spark job fails please guide.
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/spark-sql-shuffle-partitions-1-seems-to-be-working-fine-but-creates-timeout-for-large-skewed-data-tp24346.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: How to add a new column with date duration from 2 date columns in a dataframe

2015-08-20 Thread Dhaval Patel
Apologies, sent too early accidentally. Actual message is below


A dataframe has 2 datecolumns (datetime type) and I would like to add
another column that would have difference between these two dates.
Dataframe snippet is below.

new_df.show(5)
+---+--+--+
| PATID| SVCDATE|next_diag_date|
+---+--+--+
|12345655545|2012-02-13| 2012-02-13|
|12345655545|2012-02-13| 2012-02-13|
|12345655545|2012-02-13| 2012-02-27| +---+--+--+



Here is what I have tried so far:

-> new_df.withColumn('SVCDATE2',
(new_df.next_diag_date-new_df.SVCDATE)).show()
Error: DateType does not support numeric operations

-> new_df.withColumn('SVCDATE2',
(new_df.next_diag_date-new_df.SVCDATE).days).show()
Error: Can't extract value from (next_diag_date#927 - SVCDATE#377);


However this simple python code works fine with pySpark:

from datetime import date
d0 = date(2008, 8, 18)
d1 = date(2008, 9, 26)
delta = d0 - d1
print (d0 - d1).days

# -39


Any suggestions would be appreciated! Also is there a way to add a new
column in dataframe without using column expression (e.g. like in pandas or
R. df$new_col = 'new col value')?


Thanks,
Dhaval



On Thu, Aug 20, 2015 at 8:18 AM, Dhaval Patel  wrote:

> new_df.withColumn('SVCDATE2',
> (new_df.next_diag_date-new_df.SVCDATE).days).show()
>
> +---+--+--+ | PATID| SVCDATE|next_diag_date|
> +---+--+--+ |12345655545|2012-02-13|
> 2012-02-13| |12345655545|2012-02-13| 2012-02-13| |12345655545|2012-02-13|
> 2012-02-27| +---+--+--+
>


DataFrameWriter.jdbc is very slow

2015-08-20 Thread Aram Mkrtchyan
We want to migrate our data (approximately 20M rows) from parquet to postgres,
when we are using dataframe writer's jdbc method the execution time is very
large,  we have tried the same with batch insert it was much effective.
Is it intentionally implemented in that way?


How to add a new column with date duration from 2 date columns in a dataframe

2015-08-20 Thread Dhaval Patel
new_df.withColumn('SVCDATE2',
(new_df.next_diag_date-new_df.SVCDATE).days).show()

+---+--+--+ | PATID| SVCDATE|next_diag_date|
+---+--+--+ |12345655545|2012-02-13|
2012-02-13| |12345655545|2012-02-13| 2012-02-13| |12345655545|2012-02-13|
2012-02-27| +---+--+--+


Data locality with HDFS not being seen

2015-08-20 Thread Sunil
Hello .  I am seeing some unexpected issues with achieving HDFS data
locality. I expect the tasks to be executed only on the node which has the
data but this is not happening (ofcourse, unless the node is busy in which
case, I understand tasks can go to some other node). Could anyone clarify
whats wrong with the way I am trying or what I should rather do? Below is
the cluster configuration and experiments that I have tried. Any help will
be appreciated. If you would like to recreate the below scenario, then you
may use the JavaWordCount.java example given within the spark.

*Cluster configuration:*

1. spark-1.4.0 and hadoop-2.7.1
2. Machines --> Master node (master) and 6 worker nodes (node1 to node6) 
3. master acts as --> spark master, HDFS name node & sec name node, Yarn
resource manager
4. Each of the 6 worker nodes act as --> spark worker node, HDFS data node,
node manager

*Data on HDFS:*

20Mb text file is stored in single block. With the replication factor of 3,
the text file is stored on nodes 2, 3 & 4.

*Test-1 (Spark stand alone mode):*

Application being run is the standard Java word count count example with the
above text file in HDFS, as input. On job submission, I see in the spark
web-UI that, stage-0(i.e mapToPair) is being run on random nodes (i.e.
node1, node 2, node 6, etc.). By random I mean that, stage 0 executes on the
very first worker node that gets registered to the application (this can be
looked from the event timeline graph). Rather, I am expecting the stage-0 to
be run only on any one of the three nodes 2, 3, or 4. 

* Test-2 (Yarn cluster mode): *
Same as above. No data locality seen. 

* Additional info: *
No other spark applications are running and I have even tried by setting the
/spark.locality.wait/ to 10s, but still no difference.

Thanks and regards,
Sunil



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Data-locality-with-HDFS-not-being-seen-tp24361.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to overwrite partition when writing Parquet?

2015-08-20 Thread Romi Kuntsman
Cheng - what if I want to overwrite a specific partition?

I'll to remove the folder, as Hemant suggested...

On Thu, Aug 20, 2015 at 1:17 PM Cheng Lian  wrote:

> You can apply a filter first to filter out data of needed dates and then
> append them.
>
>
> Cheng
>
>
> On 8/20/15 4:59 PM, Hemant Bhanawat wrote:
>
> How can I overwrite only a given partition or manually remove a partition
> before writing?
>
> I don't know if (and I don't think)  there is a way to do that using a
> mode. But doesn't manually deleting the directory of a particular partition
> help? For directory structure, check this out...
>
>
> http://spark.apache.org/docs/latest/sql-programming-guide.html#partition-discovery
>
>
> On Wed, Aug 19, 2015 at 8:18 PM, Romi Kuntsman  wrote:
>
>> Hello,
>>
>> I have a DataFrame, with a date column which I want to use as a partition.
>> Each day I want to write the data for the same date in Parquet, and then
>> read a dataframe for a date range.
>>
>> I'm using:
>>
>> myDataframe.write().partitionBy("date").mode(SaveMode.Overwrite).parquet(parquetDir);
>>
>> If I use SaveMode.Append, then writing data for the same partition adds
>> the same data there again.
>> If I use SaveMode.Overwrite, then writing data for a single partition
>> removes all the data for all partitions.
>>
>> How can I overwrite only a given partition or manually remove a partition
>> before writing?
>>
>> Many thanks!
>> Romi K.
>>
>
>
>


Re: Convert mllib.linalg.Matrix to Breeze

2015-08-20 Thread Yanbo Liang
You can use Matrix.toBreeze()

 .

2015-08-20 18:24 GMT+08:00 Naveen :

> Hi All,
>
> Is there anyway to convert a mllib matrix to a Dense Matrix of Breeze? Any
> leads are appreciated.
>
>
> Thanks,
> Naveen
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Spark 1.3. Insert into hive parquet partitioned table from DataFrame

2015-08-20 Thread Masf
Hi.

I have a dataframe and I want to insert these data into parquet partitioned
table in Hive.

In Spark 1.4 I can use
df.write.partitionBy("x","y").format("parquet").mode("append").saveAsTable("tbl_parquet")

but in Spark 1.3 I can't. How can I do it?

Thanks

-- 
Regards
Miguel


Convert mllib.linalg.Matrix to Breeze

2015-08-20 Thread Naveen

Hi All,

Is there anyway to convert a mllib matrix to a Dense Matrix of Breeze? 
Any leads are appreciated.



Thanks,
Naveen

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to overwrite partition when writing Parquet?

2015-08-20 Thread Cheng Lian
You can apply a filter first to filter out data of needed dates and then 
append them.


Cheng

On 8/20/15 4:59 PM, Hemant Bhanawat wrote:
How can I overwrite only a given partition or manually remove a 
partition before writing?


I don't know if (and I don't think)  there is a way to do that using a 
mode. But doesn't manually deleting the directory of a particular 
partition help? For directory structure, check this out...


http://spark.apache.org/docs/latest/sql-programming-guide.html#partition-discovery


On Wed, Aug 19, 2015 at 8:18 PM, Romi Kuntsman > wrote:


Hello,

I have a DataFrame, with a date column which I want to use as a
partition.
Each day I want to write the data for the same date in Parquet,
and then read a dataframe for a date range.

I'm using:

myDataframe.write().partitionBy("date").mode(SaveMode.Overwrite).parquet(parquetDir);

If I use SaveMode.Append, then writing data for the same partition
adds the same data there again.
If I use SaveMode.Overwrite, then writing data for a single
partition removes all the data for all partitions.

How can I overwrite only a given partition or manually remove a
partition before writing?

Many thanks!
Romi K.






[SparkR] How to perform a for loop on a DataFrame object

2015-08-20 Thread Florian M
Hi guys, 

First of all, thank you for your amazing work.

As you can see in the subject, I post here because I need to perform a for
loop on a DataFrame object. 

Sample of my Dataset (the entire dataset is ~400k lines long) : 

I use the 1.4.1 Spark version with R in 3.2.1

I launch sparkR using (the package can be found at
http://spark-packages.org/package/databricks/spark-csv )



I load my dataset from HDFS using the following command (the package is
needed to load a CSV in a Spark DataFrame): 



When I do a summary, the output is : 


What I need to do is to calculate :


But you probably know that we can't do this because the read.df function
return an S4 object and it is not an iterable object.

Does anyone know how can I do that ? 
Maybe I have to convert the type of the DataFrame or use another function to
load my dataset...
I have to say that I'm new to Spark and SparkR :)

Thanks for your time,

Florian




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/SparkR-How-to-perform-a-for-loop-on-a-DataFrame-object-tp24359.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Transformation not happening for reduceByKey or GroupByKey

2015-08-20 Thread satish chandra j
HI All,
I have data in RDD as mentioned below:

RDD : Array[(Int),(Int)] = Array((0,1), (0,2),(1,20),(1,30),(2,40))


I am expecting output as Array((0,3),(1,50),(2,40)) just a sum function on
Values for each key

Code:
RDD.reduceByKey((x,y) => x+y)
RDD.take(3)

Result in console:
RDD: org.apache.spark.rdd.RDD[(Int,Int)]= ShuffledRDD[1] at reduceByKey at
:73
res:Array[(Int,Int)] = Array()

Command as mentioned

dse spark --master local --jars postgresql-9.4-1201.jar -i  


Please let me know what is missing in my code, as my resultant Array is
empty



Regards,
Satish


SparkSQL concerning materials

2015-08-20 Thread Dawid Wysakowicz
Hi,

I would like to dip into SparkSQL. Get to know better the architecture,
good practices, some internals. Could you advise me some materials on this
matter?

Regards
Dawid


Memory-efficient successive calls to repartition()

2015-08-20 Thread abellet
Hello,

For the need of my application, I need to periodically "shuffle" the data
across nodes/partitions of a reasonably-large dataset. This is an expensive
operation but I only need to do it every now and then. However it seems that
I am doing something wrong because as the iterations go the memory usage
increases, causing the job to spill onto HDFS, which eventually gets full. I
am also getting some "Lost executor" errors that I don't get if I don't
repartition.

Here's a basic piece of code which reproduces the problem:

data = sc.textFile("ImageNet_gist_train.txt",50).map(parseLine).cache()
data.count()
for i in range(1000):
data=data.repartition(50).persist()
# below several operations are done on data


What am I doing wrong? I tried the following but it doesn't solve the issue:

for i in range(1000):
data2=data.repartition(50).persist()
data2.count() # materialize rdd
data.unpersist() # unpersist previous version
data=data2


Help and suggestions on this would be greatly appreciated! Thanks a lot!




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Memory-efficient-successive-calls-to-repartition-tp24358.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Out of memory exception in MLlib's naive baye's classification training

2015-08-20 Thread minerva
Hallo,
I used Mahout for Text Classification and now I'm trying with Spark.

I had the same Problem training Bayes with (only) 569 Documents.

I solved doing htf = HashingTF(5000) instead of htf = HashingTF() [default
Features Space 2^20). I don't know if it can be considered a longterm
Solution (what will it happen trying to train with much much more
Documents?) but I have two bigger issues at the moment.

My first issue at the moment is the creation of the LabeledPoint for the
Bayes Model.
The TFIDF Transformation gives back a RDD with Sparse Vector and I saved my
Labels (Categories) in another RDD.

I still didn't find an good solution to combine both Information while
creating the LabeledPoint. 
My Solution costs a lot of collects (one pro Document). Each collect takes 4
Sec (Running on a VM with 16GB RAM, 8 Core) and it results in circa 40
Minutes only to create the LabeledPoint after the TFIDF Calculation.

My second Issue is that maybe saving Labels and Features separated and
combine them later could cause problems while running on more Nodes (now
running on a single Node) because I can not be sure that the Order of the
Labels I saved will match to the Order of Features in the Sparse
Vector...or?

Is there a Post or "BestPractice" I can read to solve the two Issues?
Thanks a lot!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Out-of-memory-exception-in-MLlib-s-naive-baye-s-classification-training-tp14809p24357.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: SQLContext Create Table Problem

2015-08-20 Thread Yusuf Can Gürkan
Someone has any idea about this subject?

> On 20 Aug 2015, at 00:47, Yusuf Can Gürkan  wrote:
> 
> Hey,
> 
> This is my spark-env:
> 
> # Add Hadoop libraries to Spark classpath
> SPARK_CLASSPATH="${SPARK_CLASSPATH}:${HADOOP_HOME}/*:${HADOOP_HOME}/../hadoop-hdfs/*:${HADOOP_HOME}/../hadoop-mapreduce/*:${HADOOP_HOME}/../hadoop-yarn/*"
> LD_LIBRARY_PATH="${LD_LIBRARY_PATH}:${HADOOP_HOME}/lib/native"
> 
> # Add Hive conf to Spark classpath
> SPARK_CLASSPATH="${SPARK_CLASSPATH}:${HIVE_CONF_DIR}"
> 
> # Add Hadoop LZO library to Spark classpath and native library to library path
> SPARK_CLASSPATH="${SPARK_CLASSPATH}:${HADOOP_HOME}/../hadoop-lzo/lib/*"
> LD_LIBRARY_PATH="${LD_LIBRARY_PATH}:${HADOOP_HOME}/../hadoop-lzo/lib/native"
> 
> # Add EMRFS libraries to Spark classpath
> SPARK_CLASSPATH="${SPARK_CLASSPATH}:/usr/share/aws/emr/emrfs/conf:/usr/share/aws/emr/emrfs/lib/*:/usr/share/aws/emr/emrfs/auxlib/*"
> 
> export SPARK_CLASSPATH
> export LD_LIBRARY_PATH
> 
> I think hive is on class path, should i do another thing?
> 
>> On 19 Aug 2015, at 20:12, Eric Walker mailto:e...@node.io>> 
>> wrote:
>> 
>> Hi,
>> 
>> On EMR, take a look at the "SPARK_CLASSPATH" variable in 
>> /home/hadoop/spark/conf/spark-env.sh.  You may need to add a jar file to it.
>> 
>> Regards,
>> Eric
>> 
>> 
>> On Wed, Aug 19, 2015 at 11:03 AM, Yusuf Can Gürkan > > wrote:
>> Hey Yin,
>> 
>> Thanks for answer. I thought that this could be problem but i can not create 
>> HiveContext because i can not import org.apache.spark.sql.hive.HiveContext. 
>> It does not see this package. 
>> 
>> I read that i should build spark with -PHive but i’m running on Amazon EMR 
>> 1.4.1 and on spark-shell i can import hive package but can not do the same 
>> on spark-submit. Do you have any idea why? Because if it’s related to build 
>> with -PHive, how can i import it in spark-shell?
>> 
>>> On 19 Aug 2015, at 18:59, Yin Huai >> > wrote:
>>> 
>>> Can you try to use HiveContext instead of SQLContext? Your query is trying 
>>> to create a table and persist the metadata of the table in metastore, which 
>>> is only supported by HiveContext.
>>> 
>>> On Wed, Aug 19, 2015 at 8:44 AM, Yusuf Can Gürkan >> > wrote:
>>> Hello,
>>> 
>>> I’m trying to create a table with sqlContext.sql method as below:
>>> 
>>> val sc = new SparkContext()
>>> val sqlContext = new SQLContext(sc)
>>> 
>>> import sqlContext.implicits._
>>> 
>>> sqlContext.sql(s"""
>>> create table if not exists landing (
>>> date string,
>>> referrer string
>>> )
>>> partitioned by (partnerid string,dt string)
>>> row format delimited fields terminated by '\t' lines terminated by '\n'
>>> STORED AS TEXTFILE LOCATION 's3n://...' <>
>>>   "”")
>>> 
>>> 
>>> It gives error on spark-submit:
>>> 
>>> Exception in thread "main" java.lang.RuntimeException: [2.1] failure: 
>>> ``with'' expected but identifier create found
>>> 
>>> create external table if not exists landing (
>>> 
>>> ^
>>> at scala.sys.package$.error(package.scala:27)
>>> at 
>>> org.apache.spark.sql.catalyst.AbstractSparkSQLParser.parse(AbstractSparkSQLParser.scala:36)
>>> at 
>>> org.apache.spark.sql.catalyst.DefaultParserDialect.parse(ParserDialect.scala:67)
>>> 
>>> 
>>> 
>>> What can be the reason??
>>> 
>> 
>> 
> 



Re: spark.sql.shuffle.partitions=1 seems to be working fine but creates timeout for large skewed data

2015-08-20 Thread Hemant Bhanawat
Looks like you are using hash based shuffling and not sort based shuffling
which creates a single file per maptask.

On Thu, Aug 20, 2015 at 12:43 AM, unk1102  wrote:

> Hi I have a Spark job which deals with large skewed dataset. I have around
> 1000 Hive partitions to process in four different tables every day. So if I
> go with 200 spark.sql.shuffle.partitions default partitions created by
> Spark
> I end up with 4 * 1000 * 200 = 8 small small files in HDFS which wont
> be
> good for HDFS name node I have been told if you keep on creating such large
> no of small small files namenode will crash is it true? please help me
> understand. Anyways so to avoid creating small files I did set
> spark.sql.shuffle.partitions=1 it seems to be creating 1 output file and as
> per my understanding because of only one output there is so much shuffling
> to do to bring all data to once reducer please correct me if I am wrong.
> This is causing memory/timeout issues how do I deal with it
>
> I tried to give spark.shuffle.storage=0.7 also still this memory seems not
> enough for it. I have 25 gb executor with 4 cores and 20 such executors
> still Spark job fails please guide.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/spark-sql-shuffle-partitions-1-seems-to-be-working-fine-but-creates-timeout-for-large-skewed-data-tp24346.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


MLlib Prefixspan implementation

2015-08-20 Thread Alexis Gillain
I want to use prefixspan so I had a look at the code and the cited paper :
"Distributed PrefixSpan Algorithm Based on MapReduce".

There is a result in the paper I didn't really undertstand and I could'nt
find where it is used in the code.

Suppose a sequence database S = {­1­,2...­n}, a sequence  is a
length-(L-1) (2≤L≤n) sequential pattern, in projected databases which is a
prefix of a length-(L-1) sequential pattern , when the support count
of  is not less than min_support, it is equal to obtaining a length-L
sequential pattern < a ... a > from projected databases that obtaining a
length-L sequential pattern < a ... a > from a sequence database S.

According to the paper It's supposed to add a pruning step in the reduce
function but I couldn't find where.

This result seems to come from a previous paper : "Wang Linlin, Fan Jun.
Improved Algorithm for Sequential Pattern Mining Based on PrefixSpan [J].
Computer Engineering, 2009, 35(23): 56-61" but it didn't help me to
understand it and how it can improve the algorithm.
-- 
Alexis GILLAIN


Re: How to overwrite partition when writing Parquet?

2015-08-20 Thread Hemant Bhanawat
How can I overwrite only a given partition or manually remove a partition
before writing?

I don't know if (and I don't think)  there is a way to do that using a
mode. But doesn't manually deleting the directory of a particular partition
help? For directory structure, check this out...

http://spark.apache.org/docs/latest/sql-programming-guide.html#partition-discovery


On Wed, Aug 19, 2015 at 8:18 PM, Romi Kuntsman  wrote:

> Hello,
>
> I have a DataFrame, with a date column which I want to use as a partition.
> Each day I want to write the data for the same date in Parquet, and then
> read a dataframe for a date range.
>
> I'm using:
>
> myDataframe.write().partitionBy("date").mode(SaveMode.Overwrite).parquet(parquetDir);
>
> If I use SaveMode.Append, then writing data for the same partition adds
> the same data there again.
> If I use SaveMode.Overwrite, then writing data for a single partition
> removes all the data for all partitions.
>
> How can I overwrite only a given partition or manually remove a partition
> before writing?
>
> Many thanks!
> Romi K.
>


Re: persist for DStream

2015-08-20 Thread Hemant Bhanawat
Are you asking for something more than this?

http://spark.apache.org/docs/latest/streaming-programming-guide.html#caching--persistence



On Thu, Aug 20, 2015 at 2:09 PM, Deepesh Maheshwari <
deepesh.maheshwar...@gmail.com> wrote:

> Hi,
>
> there are function available tp cache() or persist() RDD in memory but i
> am reading data from kafka in form of DStream and applying operation it and
> i want to persist that DStream in memory for further.
>
> Please suggest method how i can persist DStream in memory.
>
> Regards,
> Deepesh
>


persist for DStream

2015-08-20 Thread Deepesh Maheshwari
Hi,

there are function available tp cache() or persist() RDD in memory but i am
reading data from kafka in form of DStream and applying operation it and i
want to persist that DStream in memory for further.

Please suggest method how i can persist DStream in memory.

Regards,
Deepesh


Re: "insert overwrite table phonesall" in spark-sql resulted in java.io.StreamCorruptedException

2015-08-20 Thread John Jay
The answer is that my table was not serialized by kyro,but I started
spark-sql shell with kyro,so the data could not be deserialized。



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/insert-overwrite-table-phonesall-in-spark-sql-resulted-in-java-io-StreamCorruptedException-tp23579p24354.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org