the spark job is so slow during shuffle - almost frozen

2016-07-18 Thread Zhiliang Zhu

  Show original message 


Hi  All , 
While referring to spark UI , displayed as  198/200 - almost frozen...during 
shuffle stage of one task, most of the executor is with 0 byte, but just  one 
executor is with 1 G .
moreover, in the several join operation , some case is like this, one table or 
pairrdd is only with 40 keys, but the other table is with 10, 000 number 
keys.
Then, could it be decided some issue as data skew ...
Any help or comment will be deep appreciated .
Thanks in advance ~ 


 Here we have one application, it needs to extract different columns from 6 
hive tables, and then does some easy calculation, there is around 100,000
 number of rows in each table, finally need to output another table or file 
(with format of consistent  columns) .

 However, after lots of days trying, the spark hive job is unthinkably slow - 
sometimes almost frozen. There is 5 nodes for spark cluster.

 Could anyone offer some help, some idea or clue is also good.

 Thanks in advance~



On Tuesday, July 19, 2016 11:05 AM, Zhiliang Zhu  wrote:
  Show original message 

 

 Hi Mungeol,
Thanks a lot for your help. I will try that. 

On Tuesday, July 19, 2016 9:21 AM, Mungeol Heo  
wrote:
 

 Try to run a action at a Intermediate stage of your job process. Like
save, insertInto, etc.
Wish it can help you out.

On Mon, Jul 18, 2016 at 7:33 PM, Zhiliang Zhu
 wrote:
> Thanks a lot for your reply .
>
> In effect , here we tried to run the sql on kettle, hive and spark hive (by
> HiveContext) respectively, the job seems frozen  to finish to run .
>
> In the 6 tables , need to respectively read the different columns in
> different tables for specific information , then do some simple calculation
> before output .
> join operation is used most in the sql .
>
> Best wishes!
>
>
>
>
> On Monday, July 18, 2016 6:24 PM, Chanh Le  wrote:
>
>
> Hi,
> What about the network (bandwidth) between hive and spark?
> Does it run in Hive before then you move to Spark?
> Because It's complex you can use something like EXPLAIN command to show what
> going on.
>
>
>
>
>
>
> On Jul 18, 2016, at 5:20 PM, Zhiliang Zhu 
> wrote:
>
> the sql logic in the program is very much complex , so do not describe the
> detailed codes  here .
>
>
> On Monday, July 18, 2016 6:04 PM, Zhiliang Zhu 
> wrote:
>
>
> Hi All,
>
> Here we have one application, it needs to extract different columns from 6
> hive tables, and then does some easy calculation, there is around 100,000
> number of rows in each table,
> finally need to output another table or file (with format of consistent
> columns) .
>
>  However, after lots of days trying, the spark hive job is unthinkably slow
> - sometimes almost frozen. There is 5 nodes for spark cluster.
>
> Could anyone offer some help, some idea or clue is also good.
>
> Thanks in advance~
>
> Zhiliang
>


























   

  

Unsubscribe

2016-07-18 Thread Jinan Alhajjaj
  

Re: Dynamically get value based on Map key in Spark Dataframe

2016-07-18 Thread Divya Gehlot
Hi Jacek,

Can you please share example how can I access broacasted map
val pltStnMapBrdcst = sc.broadcast(keyvalueMap )
val df_replacekeys = df_input.withColumn("map_values",
pltStnMapBrdcst.value.get("key"

Is the above the right way to access the broadcasted map ?



Thanks,
Divya


On 18 July 2016 at 23:06, Jacek Laskowski  wrote:

> See broadcast variable.
>
> Or (just a thought) do join between DataFrames.
>
> Jacek
>
> On 18 Jul 2016 9:24 a.m., "Divya Gehlot"  wrote:
>
>> Hi,
>>
>> I have created a map by reading a text file
>> val keyValueMap = file_read.map(t => t.getString(0) ->
>> t.getString(4)).collect().toMap
>>
>> Now I have another dataframe where I need to dynamically replace all the
>> keys of Map with values
>> val df_input = reading the file as dataframe
>> val df_replacekeys =
>> df_input.withColumn("map_values",lit(keyValueMap (col("key"
>>
>> Would really appreciate the help .
>>
>>
>> Thanks,
>> Divya
>>
>>
>>


Re: Execute function once on each node

2016-07-18 Thread Josh Asplund
The spark workers are running side-by-side with scientific simulation code.
The code writes output to local SSDs to keep latency low. Due to the volume
of data being moved (10's of terabytes +), it isn't really feasible to copy
the data to a global filesystem. Executing a function on each node would
allow us to read the data in situ without a copy.

I understand that manually assigning tasks to nodes reduces fault
tolerance, but the simulation codes already explicitly assign tasks, so a
failure of any one node is already a full-job failure.

On Mon, Jul 18, 2016 at 3:43 PM Aniket Bhatnagar 
wrote:

> You can't assume that the number to nodes will be constant as some may
> fail, hence you can't guarantee that a function will execute at most once
> or atleast once on a node. Can you explain your use case in a bit more
> detail?
>
> On Mon, Jul 18, 2016, 10:57 PM joshuata  wrote:
>
>> I am working on a spark application that requires the ability to run a
>> function on each node in the cluster. This is used to read data from a
>> directory that is not globally accessible to the cluster. I have tried
>> creating an RDD with n elements and n partitions so that it is evenly
>> distributed among the n nodes, and then mapping a function over the RDD.
>> However, the runtime makes no guarantees that each partition will be
>> stored
>> on a separate node. This means that the code will run multiple times on
>> the
>> same node while never running on another.
>>
>> I have looked through the documentation and source code for both RDDs and
>> the scheduler, but I haven't found anything that will do what I need. Does
>> anybody know of a solution I could use?
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Execute-function-once-on-each-node-tp27351.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>


Re: spark-submit local and Akka startup timeouts

2016-07-18 Thread Bryan Cutler
Hi Rory, for starters what version of Spark are you using?  I believe that
in a 1.5.? release (I don't know which one off the top of my head) there
was an addition that would also display the config property when a timeout
happened.  That might help some if you are able to upgrade.

On Jul 18, 2016 9:34 AM, "Rory Waite"  wrote:

> Hi All,
>
> We have created a regression test for a spark job that is executed during
> our automated build. It executes a spark-submit with a local master,
> processes some data, and the exits. We have an issue in that we get a
> non-deterministic timeout error. It seems to be when the spark context
> tries to initialise Akka (stack trace below). It doesn't happen often, but
> when it does it causes the whole build to fail.
>
> The machines that run these tests get very heavily loaded, with many
> regression tests running simultaneously. My theory is that the spark-submit
> is sometimes unable to initialise Akka in time because the machines are so
> heavily loaded with the other tests. My first thought was to try to tune
> some parameter to extend the timeout, but I couldn't find anything in the
> documentation. The timeout is short at 10s, whereas the default akka
> timeout is set at 100s.
>
> Is there a way to adjust this timeout?
>
> 16/07/17 00:04:22 ERROR SparkContext: Error initializing SparkContext.
> java.util.concurrent.TimeoutException: Futures timed out after [1
> milliseconds]
> at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
> at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
> at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
> at
> scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
> at scala.concurrent.Await$.result(package.scala:107)
> at akka.remote.Remoting.start(Remoting.scala:179)
> at
> akka.remote.RemoteActorRefProvider.init(RemoteActorRefProvider.scala:184)
> at akka.actor.ActorSystemImpl.liftedTree2$1(ActorSystem.scala:620)
> at akka.actor.ActorSystemImpl._start$lzycompute(ActorSystem.scala:617)
> at akka.actor.ActorSystemImpl._start(ActorSystem.scala:617)
> at akka.actor.ActorSystemImpl.start(ActorSystem.scala:634)
> at akka.actor.ActorSystem$.apply(ActorSystem.scala:142)
> at akka.actor.ActorSystem$.apply(ActorSystem.scala:119)
> at
> org.apache.spark.util.AkkaUtils$.org$apache$spark$util$AkkaUtils$$doCreateActorSystem(AkkaUtils.scala:121)
> at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:53)
> at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:52)
> at
> org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$sp(Utils.scala:1964)
> at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
> at org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:1955)
> at org.apache.spark.util.AkkaUtils$.createActorSystem(AkkaUtils.scala:55)
> at org.apache.spark.SparkEnv$.create(SparkEnv.scala:266)
> at org.apache.spark.SparkEnv$.createDriverEnv(SparkEnv.scala:193)
> at org.apache.spark.SparkContext.createSparkEnv(SparkContext.scala:288)
> at org.apache.spark.SparkContext.(SparkContext.scala:457)
> at com.sdl.nntrainer.NNTrainer$.main(NNTrainer.scala:418)
> at com.sdl.nntrainer.NNTrainer.main(NNTrainer.scala)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:497)
> at
> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
> at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
> at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
> 16/07/17 00:04:22 INFO RemoteActorRefProvider$RemotingTerminator: Shutting
> down remote daemon.
> 16/07/17 00:04:22 INFO SparkContext: Successfully stopped SparkContext
> Exception in thread "main" java.util.concurrent.TimeoutException: Futures
> timed out after [1 milliseconds]
> at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
> at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
> at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
> at
> scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
> at scala.concurrent.Await$.result(package.scala:107)
> at akka.remote.Remoting.start(Remoting.scala:179)
> at
> akka.remote.RemoteActorRefProvider.init(RemoteActorRefProvider.scala:184)
> at akka.actor.ActorSystemImpl.liftedTree2$1(ActorSystem.scala:620)
> at akka.actor.ActorSystemImpl._start$lzycompute(ActorSystem.scala:617)
> at akka.actor.ActorSystemImpl._start(ActorSystem.scala:617)
> at 

ApacheCon: Getting the word out internally

2016-07-18 Thread Melissa Warnkin
ApacheCon: Getting the word out internally
Dear Apache Enthusiast,

As you are no doubt already aware, we will be holding ApacheCon in
Seville, Spain, the week of November 14th, 2016. The call for papers
(CFP) for this event is now open, and will remain open until
September 9th.

The event is divided into two parts, each with its own CFP. The first
part of the event, called Apache Big Data, focuses on Big Data
projects and related technologies.

Website: http://events.linuxfoundation.org/events/apache-big-data-europe
CFP:
http://events.linuxfoundation.org/events/apache-big-data-europe/program/cfp

The second part, called ApacheCon Europe, focuses on the Apache
Software Foundation as a whole, covering all projects, community
issues, governance, and so on.

Website: http://events.linuxfoundation.org/events/apachecon-europe
CFP: http://events.linuxfoundation.org/events/apachecon-europe/program/cfp

ApacheCon is the official conference of the Apache Software
Foundation, and is the best place to meet members of your project and
other ASF projects, and strengthen your project's community.

If your organization is interested in sponsoring ApacheCon, contact Rich Bowen
at e...@apache.org  ApacheCon is a great place to find the brightest
developers in the world, and experts on a huge range of technologies.

I hope to see you in Seville!
==

Melissaon behalf of the ApacheCon Team


Re: Execute function once on each node

2016-07-18 Thread Aniket Bhatnagar
You can't assume that the number to nodes will be constant as some may
fail, hence you can't guarantee that a function will execute at most once
or atleast once on a node. Can you explain your use case in a bit more
detail?

On Mon, Jul 18, 2016, 10:57 PM joshuata  wrote:

> I am working on a spark application that requires the ability to run a
> function on each node in the cluster. This is used to read data from a
> directory that is not globally accessible to the cluster. I have tried
> creating an RDD with n elements and n partitions so that it is evenly
> distributed among the n nodes, and then mapping a function over the RDD.
> However, the runtime makes no guarantees that each partition will be stored
> on a separate node. This means that the code will run multiple times on the
> same node while never running on another.
>
> I have looked through the documentation and source code for both RDDs and
> the scheduler, but I haven't found anything that will do what I need. Does
> anybody know of a solution I could use?
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Execute-function-once-on-each-node-tp27351.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: transtition SQLContext to SparkSession

2016-07-18 Thread Reynold Xin
Good idea.

https://github.com/apache/spark/pull/14252



On Mon, Jul 18, 2016 at 12:16 PM, Michael Armbrust 
wrote:

> + dev, reynold
>
> Yeah, thats a good point.  I wonder if SparkSession.sqlContext should be
> public/deprecated?
>
> On Mon, Jul 18, 2016 at 8:37 AM, Koert Kuipers  wrote:
>
>> in my codebase i would like to gradually transition to SparkSession, so
>> while i start using SparkSession i also want a SQLContext to be available
>> as before (but with a deprecated warning when i use it). this should be
>> easy since SQLContext is now a wrapper for SparkSession.
>>
>> so basically:
>> val session = SparkSession.builder.set(..., ...).getOrCreate()
>> val sqlc = new SQLContext(session)
>>
>> however this doesnt work, the SQLContext constructor i am trying to use
>> is private. SparkSession.sqlContext is also private.
>>
>> am i missing something?
>>
>> a non-gradual switch is not very realistic in any significant codebase,
>> and i do not want to create SparkSession and SQLContext independendly (both
>> from same SparkContext) since that can only lead to confusion and
>> inconsistent settings.
>>
>
>


Re: transtition SQLContext to SparkSession

2016-07-18 Thread Benjamin Kim
From what I read, there is no more Contexts.

"SparkContext, SQLContext, HiveContext merged into SparkSession"

I have not tested it, but I don’t know if it’s true.

Cheers,
Ben


> On Jul 18, 2016, at 8:37 AM, Koert Kuipers  wrote:
> 
> in my codebase i would like to gradually transition to SparkSession, so while 
> i start using SparkSession i also want a SQLContext to be available as before 
> (but with a deprecated warning when i use it). this should be easy since 
> SQLContext is now a wrapper for SparkSession.
> 
> so basically:
> val session = SparkSession.builder.set(..., ...).getOrCreate()
> val sqlc = new SQLContext(session)
> 
> however this doesnt work, the SQLContext constructor i am trying to use is 
> private. SparkSession.sqlContext is also private.
> 
> am i missing something?
> 
> a non-gradual switch is not very realistic in any significant codebase, and i 
> do not want to create SparkSession and SQLContext independendly (both from 
> same SparkContext) since that can only lead to confusion and inconsistent 
> settings.



Execute function once on each node

2016-07-18 Thread joshuata
I am working on a spark application that requires the ability to run a
function on each node in the cluster. This is used to read data from a
directory that is not globally accessible to the cluster. I have tried
creating an RDD with n elements and n partitions so that it is evenly
distributed among the n nodes, and then mapping a function over the RDD.
However, the runtime makes no guarantees that each partition will be stored
on a separate node. This means that the code will run multiple times on the
same node while never running on another.

I have looked through the documentation and source code for both RDDs and
the scheduler, but I haven't found anything that will do what I need. Does
anybody know of a solution I could use?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Execute-function-once-on-each-node-tp27351.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark driver getting out of memory

2016-07-18 Thread Mich Talebzadeh
can you please clarify:


   1. In what mode are you running the spark standalone, yarn-client, yarn
   cluster etc
   2. You have 4 nodes with each executor having 10G. How many actual
   executors do you see in UI (Port 4040 by default)
   3. What is master memory? Are you referring to diver memory? May be I am
   misunderstanding this
   4. The only real correlation I see with the driver memory is when you
   are running in local mode where worker lives within JVM process that you
   start with spark-shell etc. In that case driver memory matters. However, it
   appears that you are running in another mode with 4 nodes?

Can you get a snapshot of your environment tab in UI and send the output
please?

HTH


Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 18 July 2016 at 11:50, Saurav Sinha  wrote:

> I have set --drive-memory 5g. I need to understand that as no of
> partition increase drive-memory need to be increased. What will be best
> ration of No of partition/drive-memory.
>
> On Mon, Jul 18, 2016 at 4:07 PM, Zhiliang Zhu  wrote:
>
>> try to set --drive-memory xg , x would be as large as can be set .
>>
>>
>> On Monday, July 18, 2016 6:31 PM, Saurav Sinha 
>> wrote:
>>
>>
>> Hi,
>>
>> I am running spark job.
>>
>> Master memory - 5G
>> executor memort 10G(running on 4 node)
>>
>> My job is getting killed as no of partition increase to 20K.
>>
>> 16/07/18 14:53:13 INFO DAGScheduler: Got job 17 (foreachPartition at
>> WriteToKafka.java:45) with 13524 output partitions (allowLocal=false)
>> 16/07/18 14:53:13 INFO DAGScheduler: Final stage: ResultStage
>> 640(foreachPartition at WriteToKafka.java:45)
>> 16/07/18 14:53:13 INFO DAGScheduler: Parents of final stage:
>> List(ShuffleMapStage 518, ShuffleMapStage 639)
>> 16/07/18 14:53:23 INFO DAGScheduler: Missing parents: List()
>> 16/07/18 14:53:23 INFO DAGScheduler: Submitting ResultStage 640
>> (MapPartitionsRDD[271] at map at BuildSolrDocs.java:209), which has no
>> missing
>> parents
>> 16/07/18 14:53:23 INFO MemoryStore: ensureFreeSpace(8248) called with
>> curMem=41923262, maxMem=2778778828
>> 16/07/18 14:53:23 INFO MemoryStore: Block broadcast_90 stored as values
>> in memory (estimated size 8.1 KB, free 2.5 GB)
>> Exception in thread "dag-scheduler-event-loop"
>> java.lang.OutOfMemoryError: Java heap space
>> at
>> org.apache.spark.util.io.ByteArrayChunkOutputStream.allocateNewChunkIfNeeded(ByteArrayChunkOutputStream.scala:66)
>> at
>> org.apache.spark.util.io.ByteArrayChunkOutputStream.write(ByteArrayChunkOutputStream.scala:55)
>> at
>> org.xerial.snappy.SnappyOutputStream.dumpOutput(SnappyOutputStream.java:294)
>> at
>> org.xerial.snappy.SnappyOutputStream.flush(SnappyOutputStream.java:273)
>> at
>> org.apache.spark.io.SnappyOutputStreamWrapper.flush(CompressionCodec.scala:197)
>> at
>> java.io.ObjectOutputStream$BlockDataOutputStream.flush(ObjectOutputStream.java:1822)
>>
>>
>> Help needed.
>>
>> --
>> Thanks and Regards,
>>
>> Saurav Sinha
>>
>> Contact: 9742879062
>>
>>
>>
>
>
> --
> Thanks and Regards,
>
> Saurav Sinha
>
> Contact: 9742879062
>


Re: transtition SQLContext to SparkSession

2016-07-18 Thread Michael Armbrust
+ dev, reynold

Yeah, thats a good point.  I wonder if SparkSession.sqlContext should be
public/deprecated?

On Mon, Jul 18, 2016 at 8:37 AM, Koert Kuipers  wrote:

> in my codebase i would like to gradually transition to SparkSession, so
> while i start using SparkSession i also want a SQLContext to be available
> as before (but with a deprecated warning when i use it). this should be
> easy since SQLContext is now a wrapper for SparkSession.
>
> so basically:
> val session = SparkSession.builder.set(..., ...).getOrCreate()
> val sqlc = new SQLContext(session)
>
> however this doesnt work, the SQLContext constructor i am trying to use is
> private. SparkSession.sqlContext is also private.
>
> am i missing something?
>
> a non-gradual switch is not very realistic in any significant codebase,
> and i do not want to create SparkSession and SQLContext independendly (both
> from same SparkContext) since that can only lead to confusion and
> inconsistent settings.
>


Re: Question About OFF_HEAP Caching

2016-07-18 Thread Bin Fan
Here is one blog illustrating how to use Spark on Alluxio for this purpose.
Hope it will help:

http://www.alluxio.com/2016/04/getting-started-with-alluxio-and-spark/

On Mon, Jul 18, 2016 at 6:36 AM, Gene Pang  wrote:

> Hi,
>
> If you want to use Alluxio with Spark 2.x, it is recommended to write to
> and read from Alluxio with files. You can save an RDD with saveAsObjectFile
> with an Alluxio path (alluxio://host:port/path/to/file), and you can read
> that file from any other Spark job. Here is additional information on how
> to run Spark with Alluxio:
> http://www.alluxio.org/docs/master/en/Running-Spark-on-Alluxio.html
>
> Hope that helps,
> Gene
>
> On Mon, Jul 18, 2016 at 12:11 AM, condor join 
> wrote:
>
>> Hi All,
>>
>> I have some questions about OFF_HEAP Caching. In Spark 1.X when we use
>> *rdd.persist(StorageLevel.OFF_HEAP)*,that means rdd caching in
>> Tachyon(Alluxio). However,in Spark 2.X,we can directly use OFF_HEAP  For
>> Caching
>>
>> (
>> https://issues.apache.org/jira/browse/SPARK-13992?jql=project%20%3D%20SPARK%20AND%20text%20~%20%22off-heap%20caching%22).
>> I am confuse about this and I have follow questions:
>>
>> 1.In Spark 2.X, how should we use Tachyon for caching?
>>
>> 2.Is there any reason that must change in this way(I mean use off_heap
>> directly instead of using Tachyon)
>>
>> Thanks a lot!
>>
>>
>>
>>
>


Re: pyspark 1.5 0 save model ?

2016-07-18 Thread Holden Karau
If you used RandomForestClassifier from mllib you can use the save method
described in
http://spark.apache.org/docs/1.5.0/api/python/pyspark.mllib.html#module-pyspark.mllib.classification
which will write out some JSON metadata as well as parquet for the actual
model.
For the newer ml pipeline one it seems that the writable traits were added
on April 8th to master - so it isn't available yet.

On Mon, Jul 18, 2016 at 2:58 AM, pseudo oduesp 
wrote:

> Hi,
> how i can save model under pyspakr 1.5.0  ?
>  i use RandomForestClassifier()
> thanks in advance.
>



-- 
Cell : 425-233-8271
Twitter: https://twitter.com/holdenkarau


spark-submit local and Akka startup timeouts

2016-07-18 Thread Rory Waite
Hi All,

We have created a regression test for a spark job that is executed during our 
automated build. It executes a spark-submit with a local master, processes some 
data, and the exits. We have an issue in that we get a non-deterministic 
timeout error. It seems to be when the spark context tries to initialise Akka 
(stack trace below). It doesn't happen often, but when it does it causes the 
whole build to fail.

The machines that run these tests get very heavily loaded, with many regression 
tests running simultaneously. My theory is that the spark-submit is sometimes 
unable to initialise Akka in time because the machines are so heavily loaded 
with the other tests. My first thought was to try to tune some parameter to 
extend the timeout, but I couldn't find anything in the documentation. The 
timeout is short at 10s, whereas the default akka timeout is set at 100s.

Is there a way to adjust this timeout?

16/07/17 00:04:22 ERROR SparkContext: Error initializing SparkContext.
java.util.concurrent.TimeoutException: Futures timed out after [1 
milliseconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
at 
scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
at scala.concurrent.Await$.result(package.scala:107)
at akka.remote.Remoting.start(Remoting.scala:179)
at akka.remote.RemoteActorRefProvider.init(RemoteActorRefProvider.scala:184)
at akka.actor.ActorSystemImpl.liftedTree2$1(ActorSystem.scala:620)
at akka.actor.ActorSystemImpl._start$lzycompute(ActorSystem.scala:617)
at akka.actor.ActorSystemImpl._start(ActorSystem.scala:617)
at akka.actor.ActorSystemImpl.start(ActorSystem.scala:634)
at akka.actor.ActorSystem$.apply(ActorSystem.scala:142)
at akka.actor.ActorSystem$.apply(ActorSystem.scala:119)
at 
org.apache.spark.util.AkkaUtils$.org$apache$spark$util$AkkaUtils$$doCreateActorSystem(AkkaUtils.scala:121)
at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:53)
at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:52)
at 
org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$sp(Utils.scala:1964)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
at org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:1955)
at org.apache.spark.util.AkkaUtils$.createActorSystem(AkkaUtils.scala:55)
at org.apache.spark.SparkEnv$.create(SparkEnv.scala:266)
at org.apache.spark.SparkEnv$.createDriverEnv(SparkEnv.scala:193)
at org.apache.spark.SparkContext.createSparkEnv(SparkContext.scala:288)
at org.apache.spark.SparkContext.(SparkContext.scala:457)
at com.sdl.nntrainer.NNTrainer$.main(NNTrainer.scala:418)
at com.sdl.nntrainer.NNTrainer.main(NNTrainer.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at 
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
16/07/17 00:04:22 INFO RemoteActorRefProvider$RemotingTerminator: Shutting down 
remote daemon.
16/07/17 00:04:22 INFO SparkContext: Successfully stopped SparkContext
Exception in thread "main" java.util.concurrent.TimeoutException: Futures timed 
out after [1 milliseconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
at 
scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
at scala.concurrent.Await$.result(package.scala:107)
at akka.remote.Remoting.start(Remoting.scala:179)
at akka.remote.RemoteActorRefProvider.init(RemoteActorRefProvider.scala:184)
at akka.actor.ActorSystemImpl.liftedTree2$1(ActorSystem.scala:620)
at akka.actor.ActorSystemImpl._start$lzycompute(ActorSystem.scala:617)
at akka.actor.ActorSystemImpl._start(ActorSystem.scala:617)
at akka.actor.ActorSystemImpl.start(ActorSystem.scala:634)
at akka.actor.ActorSystem$.apply(ActorSystem.scala:142)
at akka.actor.ActorSystem$.apply(ActorSystem.scala:119)
at 
org.apache.spark.util.AkkaUtils$.org$apache$spark$util$AkkaUtils$$doCreateActorSystem(AkkaUtils.scala:121)
at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:53)
at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:52)
at 

Re: Trouble while running spark at ec2 cluster

2016-07-18 Thread Andy Davidson
Hi Hassan

Typically I log on to my master to submit my app.

[ec2-user@ip-172-31-11-222 bin]$ echo $SPARK_ROOT

/root/spark



[ec2-user@ip-172-31-11-222 bin]$echo $MASTER_URL

spark://ec2-54-215-11-222.us-west-1.compute.amazonaws.com:7077



[ec2-user@ip-172-31-11-222 bin]$ $SPARK_ROOT/bin/spark-submit \

--class "com.pws.sparkStreaming.collector.StreamingKafkaCollector" \

--master $MASTER_URL







I think you might be trying to launch your application from a  machine
outside of your ec2 cluster. I do not think that is going to work when you
submit to port 7077 because the driver is going to be on your local machine.
Also you probably have a file wall issue



Andy


From:  Hassaan Chaudhry 
Date:  Friday, July 15, 2016 at 9:32 PM
To:  "user @spark" 
Subject:  Trouble while running spark at ec2 cluster

> 
> Hi 
> 
> I have launched my cluster and I am trying to submit my application to run on
> cluster but its not allowing me to connect . It prompts  the following error
> "Master endpoint 
> spark://ec2-54-187-59-117.us-west-2.compute.amazonaws.com:7077
>   was not a
> REST server." The command I use to run my application on cluster is
> 
> " /spark-1.6.1/bin/spark-submit --master spark://ec2-54-200-193-107.us-west-
> 2.compute.amazonaws.com:7077 
> --deploy-mode cluster --class BFS target/scala-
> 2.10/scalaexample_2.10-1.0.jar "
> 
> Am i missing something ? Your help will be highly appreciated .
> 
> P.S  I have even tried adding inbound rule to my master node but still no
> success.
> 
> Thanks 




Re: Increasing spark.yarn.executor.memoryOverhead degrades performance

2016-07-18 Thread Sean Owen
Possibilities:

- You are using more memory now (and not getting killed), but now are
exceeding OS memory and are swapping
- Your heap sizes / config aren't quite right and now, instead of
failing earlier because YARN killed the job, you're running normally
but seeing a lot of time lost to GC thrashing

Based on your description I suspect the first one. Disable swap in
general on cluster machines.

On Mon, Jul 18, 2016 at 4:47 PM, Sunita Arvind  wrote:
> Hello Experts,
>
> For one of our streaming appilcation, we intermittently saw:
>
> WARN yarn.YarnAllocator: Container killed by YARN for exceeding memory
> limits. 12.0 GB of 12 GB physical memory used. Consider boosting
> spark.yarn.executor.memoryOverhead.
>
> Based on what I found on internet and the error message, I increased the
> memoryOverhead to 768. This is actually slowing the application. We are on
> spark1.3, so not sure if its due to any GC pauses. Just to do some
> intelligent trials, I wanted to understand what could be causing the
> degrade. Should I increase driver memoryOverhead also? Another interesting
> observation is, bringing down the executor memory to 5GB with executor
> memoryOverhead to 768 showed significant performance gains. What are the
> other associated settings?
>
> regards
> Sunita
>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Increasing spark.yarn.executor.memoryOverhead degrades performance

2016-07-18 Thread Sunita Arvind
Hello Experts,

For one of our streaming appilcation, we intermittently saw:

WARN yarn.YarnAllocator: Container killed by YARN for exceeding memory
limits. 12.0 GB of 12 GB physical memory used. Consider boosting
spark.yarn.executor.memoryOverhead.

Based on what I found on internet and the error message, I increased the
memoryOverhead to 768. This is actually slowing the application. We are on
spark1.3, so not sure if its due to any GC pauses. Just to do some
intelligent trials, I wanted to understand what could be causing the
degrade. Should I increase driver memoryOverhead also? Another interesting
observation is, bringing down the executor memory to 5GB with executor
memoryOverhead to 768 showed significant performance gains. What are the
other associated settings?

regards
Sunita


transtition SQLContext to SparkSession

2016-07-18 Thread Koert Kuipers
in my codebase i would like to gradually transition to SparkSession, so
while i start using SparkSession i also want a SQLContext to be available
as before (but with a deprecated warning when i use it). this should be
easy since SQLContext is now a wrapper for SparkSession.

so basically:
val session = SparkSession.builder.set(..., ...).getOrCreate()
val sqlc = new SQLContext(session)

however this doesnt work, the SQLContext constructor i am trying to use is
private. SparkSession.sqlContext is also private.

am i missing something?

a non-gradual switch is not very realistic in any significant codebase, and
i do not want to create SparkSession and SQLContext independendly (both
from same SparkContext) since that can only lead to confusion and
inconsistent settings.


Re: Dynamically get value based on Map key in Spark Dataframe

2016-07-18 Thread Jacek Laskowski
See broadcast variable.

Or (just a thought) do join between DataFrames.

Jacek

On 18 Jul 2016 9:24 a.m., "Divya Gehlot"  wrote:

> Hi,
>
> I have created a map by reading a text file
> val keyValueMap = file_read.map(t => t.getString(0) ->
> t.getString(4)).collect().toMap
>
> Now I have another dataframe where I need to dynamically replace all the
> keys of Map with values
> val df_input = reading the file as dataframe
> val df_replacekeys =
> df_input.withColumn("map_values",lit(keyValueMap (col("key"
>
> Would really appreciate the help .
>
>
> Thanks,
> Divya
>
>
>


Re: Custom Spark Error on Hadoop Cluster

2016-07-18 Thread Xiangrui Meng
Glad to hear. Could you please share your solution on the user mailing
list? -Xiangrui

On Mon, Jul 18, 2016 at 2:26 AM Alger Remirata 
wrote:

> Hi Xiangrui,
>
> We have now solved the problem. Thanks for all the tips you've given.
>
> Best Regards,
>
> Alger
>
> On Thu, Jul 14, 2016 at 2:43 AM, Alger Remirata 
> wrote:
>
>> By the using cloudera manager for standalone cluster manager
>>
>> On Thu, Jul 14, 2016 at 2:20 AM, Alger Remirata 
>> wrote:
>>
>>> It looks like there are a lot of people already having posted on
>>> classNotFoundError on the cluster mode fro version 1.5.1.
>>>
>>> https://www.mail-archive.com/user@spark.apache.org/msg43089.html
>>>
>>>
>>>
>>> On Thu, Jul 14, 2016 at 12:45 AM, Alger Remirata >> > wrote:
>>>
 Hi Xiangrui,

 I check all the nodes of the cluster. It is working locally on each
 node but there's an error upon deploying it on the cluster itself. I don't
 know why it is and still don't understand why on individual node, it is
 working locally but when deployed to hadoop cluster, it gets the error
 mentioned.

 Thanks,

 Alger

 On Wed, Jul 13, 2016 at 4:38 AM, Alger Remirata  wrote:

> Since we're using mvn to build, it looks like mvn didn't add the
> class. Is there something on pom.xml to be added so that the new class can
> be recognized?
>
> On Wed, Jul 13, 2016 at 4:21 AM, Alger Remirata <
> abremirat...@gmail.com> wrote:
>
>> Thanks for the reply however I couldn't locate the MLlib jar. What I
>> have is a fat 'spark-assembly-1.5.1-hadoop2.6.0.jar'.
>>
>> There's an error on me copying user@spark.apache.org. The message
>> suddently is not sent when I do that.
>>
>> On Wed, Jul 13, 2016 at 4:13 AM, Alger Remirata <
>> abremirat...@gmail.com> wrote:
>>
>>> Thanks for the reply however I couldn't locate the MLlib jar. What I
>>> have is a fat 'spark-assembly-1.5.1-hadoop2.6.0.jar'.
>>>
>>> On Tue, Jul 12, 2016 at 3:23 AM, Xiangrui Meng 
>>> wrote:
>>>
 (+user@spark. Please copy user@ so other people could see and
 help.)

 The error message means you have an MLlib jar on the classpath but
 it didn't contain ALS$StandardNNLSSolver. So it is either the
 modified jar not deployed to the workers or there existing an 
 unmodified
 MLlib jar sitting in front of the modified one on the classpath. You 
 can
 check the worker logs and see the classpath used in launching the 
 worker,
 and then check the MLlib jars on that classpath. -Xiangrui

 On Sun, Jul 10, 2016 at 10:18 PM Alger Remirata <
 abremirat...@gmail.com> wrote:

> Hi Xiangrui,
>
> We have the modified jars deployed both on master and slave nodes.
>
> What do you mean by this line?: 1. The unmodified Spark jars were
> not on the classpath (already existed on the cluster or pulled in by 
> other
> packages).
>
> How would I check that the unmodified Spark jars are not on the
> classpath? We change entirely the contents of the directory for 
> SPARK_HOME.
> The newly built customized spark is the new contents of the current
> SPARK_HOME we have right now.
>
> Thanks,
>
> Alger
>
> On Fri, Jul 8, 2016 at 1:32 PM, Xiangrui Meng  > wrote:
>
>> This seems like a deployment or dependency issue. Please check
>> the following:
>> 1. The unmodified Spark jars were not on the classpath (already
>> existed on the cluster or pulled in by other packages).
>> 2. The modified jars were indeed deployed to both master and
>> slave nodes.
>>
>> On Tue, Jul 5, 2016 at 12:29 PM Alger Remirata <
>> abremirat...@gmail.com> wrote:
>>
>>> Hi all,
>>>
>>> First of all, we like to thank you for developing spark. This
>>> helps us a lot on our data science task.
>>>
>>> I have a question. We have build a customized spark using the
>>> following command:
>>> mvn -Pyarn -Phadoop-2.4 -Dhadoop.version=2.4.0 -Phive
>>> -Phive-thriftserver -DskipTests clean package.
>>>
>>> On the custom spark we built, we've added a new scala file or
>>> package called StandardNNLS file however it got an error saying:
>>>
>>> Name: org.apache.spark.SparkException
>>> Message: Job aborted due to stage failure: Task 21 in stage 34.0
>>> failed 4 times, most recent failure: Lost task 21.3 in stage 34.0 
>>> (TID
>>> 2547, 192.168.60.115): 

Re: Question About OFF_HEAP Caching

2016-07-18 Thread Gene Pang
Hi,

If you want to use Alluxio with Spark 2.x, it is recommended to write to
and read from Alluxio with files. You can save an RDD with saveAsObjectFile
with an Alluxio path (alluxio://host:port/path/to/file), and you can read
that file from any other Spark job. Here is additional information on how
to run Spark with Alluxio:
http://www.alluxio.org/docs/master/en/Running-Spark-on-Alluxio.html

Hope that helps,
Gene

On Mon, Jul 18, 2016 at 12:11 AM, condor join 
wrote:

> Hi All,
>
> I have some questions about OFF_HEAP Caching. In Spark 1.X when we use
> *rdd.persist(StorageLevel.OFF_HEAP)*,that means rdd caching in
> Tachyon(Alluxio). However,in Spark 2.X,we can directly use OFF_HEAP  For
> Caching
>
> (
> https://issues.apache.org/jira/browse/SPARK-13992?jql=project%20%3D%20SPARK%20AND%20text%20~%20%22off-heap%20caching%22).
> I am confuse about this and I have follow questions:
>
> 1.In Spark 2.X, how should we use Tachyon for caching?
>
> 2.Is there any reason that must change in this way(I mean use off_heap
> directly instead of using Tachyon)
>
> Thanks a lot!
>
>
>
>


Re: Concatenate the columns in dataframe to create new collumns using Java

2016-07-18 Thread Abhishek Anand
Thanks Nihed.

I was able to do this in exactly the same way.


Cheers!!
Abhi

On Mon, Jul 18, 2016 at 5:56 PM, nihed mbarek  wrote:

> and if we have this static method
> df.show();
> Column c = concatFunction(df, "l1", "firstname,lastname");
> df.select(c).show();
>
> with this code :
> Column concatFunction(DataFrame df, String fieldName, String columns) {
> String[] array = columns.split(",");
> Column[] concatColumns = new Column[array.length];
> for (int i = 0; i < concatColumns.length; i++) {
> concatColumns[i]=df.col(array[i]);
> }
>
> return functions.concat(concatColumns).alias(fieldName);
> }
>
>
>
> On Mon, Jul 18, 2016 at 2:14 PM, Abhishek Anand 
> wrote:
>
>> Hi Nihed,
>>
>> Thanks for the reply.
>>
>> I am looking for something like this :
>>
>> DataFrame training = orgdf.withColumn("I1",
>> functions.concat(orgdf.col("C0"),orgdf.col("C1")));
>>
>>
>> Here I have to give C0 and C1 columns, I am looking to write a generic
>> function that concatenates the columns depending on input columns.
>>
>> like if I have something
>> String str = "C0,C1,C2"
>>
>> Then it should work as
>>
>> DataFrame training = orgdf.withColumn("I1",
>> functions.concat(orgdf.col("C0"),orgdf.col("C1"),orgdf.col("C2")));
>>
>>
>>
>> Thanks,
>> Abhi
>>
>> On Mon, Jul 18, 2016 at 4:39 PM, nihed mbarek  wrote:
>>
>>> Hi,
>>>
>>>
>>> I just wrote this code to help you. Is it what you need ??
>>>
>>>
>>> SparkConf conf = new
>>> SparkConf().setAppName("hello").setMaster("local");
>>> JavaSparkContext sc = new JavaSparkContext(conf);
>>> SQLContext sqlContext = new SQLContext(sc);
>>> List persons = new ArrayList<>();
>>> persons.add(new Person("nihed", "mbarek", "nihed.com"));
>>> persons.add(new Person("mark", "zuckerberg", "facebook.com"));
>>>
>>> DataFrame df = sqlContext.createDataFrame(persons, Person.class);
>>>
>>> df.show();
>>> final String[] columns = df.columns();
>>> Column[] selectColumns = new Column[columns.length + 1];
>>> for (int i = 0; i < columns.length; i++) {
>>> selectColumns[i]=df.col(columns[i]);
>>> }
>>>
>>>
>>> selectColumns[columns.length]=functions.concat(df.col("firstname"),
>>> df.col("lastname"));
>>>
>>> df.select(selectColumns).show();
>>>   ---
>>> public static class Person {
>>>
>>> private String firstname;
>>> private String lastname;
>>> private String address;
>>> }
>>>
>>>
>>>
>>> Regards,
>>>
>>> On Mon, Jul 18, 2016 at 12:45 PM, Abhishek Anand <
>>> abhis.anan...@gmail.com> wrote:
>>>
 Hi,

 I have a dataframe say having C0,C1,C2 and so on as columns.

 I need to create interaction variables to be taken as input for my
 program.

 For eg -

 I need to create I1 as concatenation of C0,C3,C5

 Similarly, I2  = concat(C4,C5)

 and so on ..


 How can I achieve this in my Java code for concatenation of any columns
 given input by the user.

 Thanks,
 Abhi

>>>
>>>
>>>
>>> --
>>>
>>> M'BAREK Med Nihed,
>>> Fedora Ambassador, TUNISIA, Northern Africa
>>> http://www.nihed.com
>>>
>>> 
>>>
>>>
>>
>
>
> --
>
> M'BAREK Med Nihed,
> Fedora Ambassador, TUNISIA, Northern Africa
> http://www.nihed.com
>
> 
>
>


Re: Concatenate the columns in dataframe to create new collumns using Java

2016-07-18 Thread nihed mbarek
and if we have this static method
df.show();
Column c = concatFunction(df, "l1", "firstname,lastname");
df.select(c).show();

with this code :
Column concatFunction(DataFrame df, String fieldName, String columns) {
String[] array = columns.split(",");
Column[] concatColumns = new Column[array.length];
for (int i = 0; i < concatColumns.length; i++) {
concatColumns[i]=df.col(array[i]);
}

return functions.concat(concatColumns).alias(fieldName);
}



On Mon, Jul 18, 2016 at 2:14 PM, Abhishek Anand 
wrote:

> Hi Nihed,
>
> Thanks for the reply.
>
> I am looking for something like this :
>
> DataFrame training = orgdf.withColumn("I1",
> functions.concat(orgdf.col("C0"),orgdf.col("C1")));
>
>
> Here I have to give C0 and C1 columns, I am looking to write a generic
> function that concatenates the columns depending on input columns.
>
> like if I have something
> String str = "C0,C1,C2"
>
> Then it should work as
>
> DataFrame training = orgdf.withColumn("I1",
> functions.concat(orgdf.col("C0"),orgdf.col("C1"),orgdf.col("C2")));
>
>
>
> Thanks,
> Abhi
>
> On Mon, Jul 18, 2016 at 4:39 PM, nihed mbarek  wrote:
>
>> Hi,
>>
>>
>> I just wrote this code to help you. Is it what you need ??
>>
>>
>> SparkConf conf = new
>> SparkConf().setAppName("hello").setMaster("local");
>> JavaSparkContext sc = new JavaSparkContext(conf);
>> SQLContext sqlContext = new SQLContext(sc);
>> List persons = new ArrayList<>();
>> persons.add(new Person("nihed", "mbarek", "nihed.com"));
>> persons.add(new Person("mark", "zuckerberg", "facebook.com"));
>>
>> DataFrame df = sqlContext.createDataFrame(persons, Person.class);
>>
>> df.show();
>> final String[] columns = df.columns();
>> Column[] selectColumns = new Column[columns.length + 1];
>> for (int i = 0; i < columns.length; i++) {
>> selectColumns[i]=df.col(columns[i]);
>> }
>>
>>
>> selectColumns[columns.length]=functions.concat(df.col("firstname"),
>> df.col("lastname"));
>>
>> df.select(selectColumns).show();
>>   ---
>> public static class Person {
>>
>> private String firstname;
>> private String lastname;
>> private String address;
>> }
>>
>>
>>
>> Regards,
>>
>> On Mon, Jul 18, 2016 at 12:45 PM, Abhishek Anand > > wrote:
>>
>>> Hi,
>>>
>>> I have a dataframe say having C0,C1,C2 and so on as columns.
>>>
>>> I need to create interaction variables to be taken as input for my
>>> program.
>>>
>>> For eg -
>>>
>>> I need to create I1 as concatenation of C0,C3,C5
>>>
>>> Similarly, I2  = concat(C4,C5)
>>>
>>> and so on ..
>>>
>>>
>>> How can I achieve this in my Java code for concatenation of any columns
>>> given input by the user.
>>>
>>> Thanks,
>>> Abhi
>>>
>>
>>
>>
>> --
>>
>> M'BAREK Med Nihed,
>> Fedora Ambassador, TUNISIA, Northern Africa
>> http://www.nihed.com
>>
>> 
>>
>>
>


-- 

M'BAREK Med Nihed,
Fedora Ambassador, TUNISIA, Northern Africa
http://www.nihed.com




Re: Concatenate the columns in dataframe to create new collumns using Java

2016-07-18 Thread Abhishek Anand
Hi Nihed,

Thanks for the reply.

I am looking for something like this :

DataFrame training = orgdf.withColumn("I1",
functions.concat(orgdf.col("C0"),orgdf.col("C1")));


Here I have to give C0 and C1 columns, I am looking to write a generic
function that concatenates the columns depending on input columns.

like if I have something
String str = "C0,C1,C2"

Then it should work as

DataFrame training = orgdf.withColumn("I1",
functions.concat(orgdf.col("C0"),orgdf.col("C1"),orgdf.col("C2")));



Thanks,
Abhi

On Mon, Jul 18, 2016 at 4:39 PM, nihed mbarek  wrote:

> Hi,
>
>
> I just wrote this code to help you. Is it what you need ??
>
>
> SparkConf conf = new
> SparkConf().setAppName("hello").setMaster("local");
> JavaSparkContext sc = new JavaSparkContext(conf);
> SQLContext sqlContext = new SQLContext(sc);
> List persons = new ArrayList<>();
> persons.add(new Person("nihed", "mbarek", "nihed.com"));
> persons.add(new Person("mark", "zuckerberg", "facebook.com"));
>
> DataFrame df = sqlContext.createDataFrame(persons, Person.class);
>
> df.show();
> final String[] columns = df.columns();
> Column[] selectColumns = new Column[columns.length + 1];
> for (int i = 0; i < columns.length; i++) {
> selectColumns[i]=df.col(columns[i]);
> }
>
>
> selectColumns[columns.length]=functions.concat(df.col("firstname"),
> df.col("lastname"));
>
> df.select(selectColumns).show();
>   ---
> public static class Person {
>
> private String firstname;
> private String lastname;
> private String address;
> }
>
>
>
> Regards,
>
> On Mon, Jul 18, 2016 at 12:45 PM, Abhishek Anand 
> wrote:
>
>> Hi,
>>
>> I have a dataframe say having C0,C1,C2 and so on as columns.
>>
>> I need to create interaction variables to be taken as input for my
>> program.
>>
>> For eg -
>>
>> I need to create I1 as concatenation of C0,C3,C5
>>
>> Similarly, I2  = concat(C4,C5)
>>
>> and so on ..
>>
>>
>> How can I achieve this in my Java code for concatenation of any columns
>> given input by the user.
>>
>> Thanks,
>> Abhi
>>
>
>
>
> --
>
> M'BAREK Med Nihed,
> Fedora Ambassador, TUNISIA, Northern Africa
> http://www.nihed.com
>
> 
>
>


Re: Inode for STS

2016-07-18 Thread ayan guha
Hi

Thanks for this. However, I am interested in regular deletion of temp while
server is up. Additionally, the link says it is not of use for multi-user
environment. Any other idea? is there any variation of cleaner.ttl?

On Mon, Jul 18, 2016 at 8:00 PM, Chanh Le  wrote:

> Hi Ayan,
> I seem like you mention this
> https://cwiki.apache.org/confluence/display/Hive/Configuration+Properties#ConfigurationProperties-hive.start.cleanup.scratchdir
> Default it was set false by default.
>
>
>
>
>
> On Jul 13, 2016, at 5:01 PM, ayan guha  wrote:
>
> Thanks Christophe. Any comment from Spark dev community member would
> really helpful on the Jira.
>
> What I saw today is shutting down the thrift server process lead to a
> clean up. Also, we started removing any empty folders from /tmp. Is there
> any other or better method?
>
> On Wed, Jul 13, 2016 at 5:25 PM, Christophe Préaud <
> christophe.pre...@kelkoo.com> wrote:
>
>> Hi Ayan,
>>
>> I have opened a JIRA about this issues, but there are no answer so far:
>> SPARK-15401 
>>
>> Regards,
>> Christophe.
>>
>>
>> On 13/07/16 05:54, ayan guha wrote:
>>
>> Hi
>>
>> We are running Spark Thrift Server as a long running application.
>> However,  it looks like it is filling up /tmp/hive folder with lots of
>> small files and directories with no file in them, blowing out inode limit
>> and preventing any connection with "No Space Left in Device" issue.
>>
>> What is the best way to clean up those small files periodically?
>>
>> --
>> Best Regards,
>> Ayan Guha
>>
>>
>>
>> --
>> Kelkoo SAS
>> Société par Actions Simplifiée
>> Au capital de € 4.168.964,30
>> Siège social : 158 Ter Rue du Temple 75003 Paris
>> 425 093 069 RCS Paris
>>
>> Ce message et les pièces jointes sont confidentiels et établis à
>> l'attention exclusive de leurs destinataires. Si vous n'êtes pas le
>> destinataire de ce message, merci de le détruire et d'en avertir
>> l'expéditeur.
>>
>
>
>
> --
> Best Regards,
> Ayan Guha
>
>
>


-- 
Best Regards,
Ayan Guha


Re: Concatenate the columns in dataframe to create new collumns using Java

2016-07-18 Thread nihed mbarek
Hi,


I just wrote this code to help you. Is it what you need ??


SparkConf conf = new
SparkConf().setAppName("hello").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlContext = new SQLContext(sc);
List persons = new ArrayList<>();
persons.add(new Person("nihed", "mbarek", "nihed.com"));
persons.add(new Person("mark", "zuckerberg", "facebook.com"));

DataFrame df = sqlContext.createDataFrame(persons, Person.class);

df.show();
final String[] columns = df.columns();
Column[] selectColumns = new Column[columns.length + 1];
for (int i = 0; i < columns.length; i++) {
selectColumns[i]=df.col(columns[i]);
}

selectColumns[columns.length]=functions.concat(df.col("firstname"),
df.col("lastname"));

df.select(selectColumns).show();
  ---
public static class Person {

private String firstname;
private String lastname;
private String address;
}



Regards,

On Mon, Jul 18, 2016 at 12:45 PM, Abhishek Anand 
wrote:

> Hi,
>
> I have a dataframe say having C0,C1,C2 and so on as columns.
>
> I need to create interaction variables to be taken as input for my
> program.
>
> For eg -
>
> I need to create I1 as concatenation of C0,C3,C5
>
> Similarly, I2  = concat(C4,C5)
>
> and so on ..
>
>
> How can I achieve this in my Java code for concatenation of any columns
> given input by the user.
>
> Thanks,
> Abhi
>



-- 

M'BAREK Med Nihed,
Fedora Ambassador, TUNISIA, Northern Africa
http://www.nihed.com




Re: Spark driver getting out of memory

2016-07-18 Thread Saurav Sinha
I have set --drive-memory 5g. I need to understand that as no of partition
increase drive-memory need to be increased. What will be best ration of No
of partition/drive-memory.

On Mon, Jul 18, 2016 at 4:07 PM, Zhiliang Zhu  wrote:

> try to set --drive-memory xg , x would be as large as can be set .
>
>
> On Monday, July 18, 2016 6:31 PM, Saurav Sinha 
> wrote:
>
>
> Hi,
>
> I am running spark job.
>
> Master memory - 5G
> executor memort 10G(running on 4 node)
>
> My job is getting killed as no of partition increase to 20K.
>
> 16/07/18 14:53:13 INFO DAGScheduler: Got job 17 (foreachPartition at
> WriteToKafka.java:45) with 13524 output partitions (allowLocal=false)
> 16/07/18 14:53:13 INFO DAGScheduler: Final stage: ResultStage
> 640(foreachPartition at WriteToKafka.java:45)
> 16/07/18 14:53:13 INFO DAGScheduler: Parents of final stage:
> List(ShuffleMapStage 518, ShuffleMapStage 639)
> 16/07/18 14:53:23 INFO DAGScheduler: Missing parents: List()
> 16/07/18 14:53:23 INFO DAGScheduler: Submitting ResultStage 640
> (MapPartitionsRDD[271] at map at BuildSolrDocs.java:209), which has no
> missing
> parents
> 16/07/18 14:53:23 INFO MemoryStore: ensureFreeSpace(8248) called with
> curMem=41923262, maxMem=2778778828
> 16/07/18 14:53:23 INFO MemoryStore: Block broadcast_90 stored as values in
> memory (estimated size 8.1 KB, free 2.5 GB)
> Exception in thread "dag-scheduler-event-loop" java.lang.OutOfMemoryError:
> Java heap space
> at
> org.apache.spark.util.io.ByteArrayChunkOutputStream.allocateNewChunkIfNeeded(ByteArrayChunkOutputStream.scala:66)
> at
> org.apache.spark.util.io.ByteArrayChunkOutputStream.write(ByteArrayChunkOutputStream.scala:55)
> at
> org.xerial.snappy.SnappyOutputStream.dumpOutput(SnappyOutputStream.java:294)
> at
> org.xerial.snappy.SnappyOutputStream.flush(SnappyOutputStream.java:273)
> at
> org.apache.spark.io.SnappyOutputStreamWrapper.flush(CompressionCodec.scala:197)
> at
> java.io.ObjectOutputStream$BlockDataOutputStream.flush(ObjectOutputStream.java:1822)
>
>
> Help needed.
>
> --
> Thanks and Regards,
>
> Saurav Sinha
>
> Contact: 9742879062
>
>
>


-- 
Thanks and Regards,

Saurav Sinha

Contact: 9742879062


Concatenate the columns in dataframe to create new collumns using Java

2016-07-18 Thread Abhishek Anand
Hi,

I have a dataframe say having C0,C1,C2 and so on as columns.

I need to create interaction variables to be taken as input for my program.

For eg -

I need to create I1 as concatenation of C0,C3,C5

Similarly, I2  = concat(C4,C5)

and so on ..


How can I achieve this in my Java code for concatenation of any columns
given input by the user.

Thanks,
Abhi


Re: Spark driver getting out of memory

2016-07-18 Thread Zhiliang Zhu
try to set --drive-memory xg , x would be as large as can be set .  

On Monday, July 18, 2016 6:31 PM, Saurav Sinha  
wrote:
 

 Hi,
I am running spark job.
Master memory - 5Gexecutor memort 10G(running on 4 node)
My job is getting killed as no of partition increase to 20K.
16/07/18 14:53:13 INFO DAGScheduler: Got job 17 (foreachPartition at 
WriteToKafka.java:45) with 13524 output partitions (allowLocal=false)16/07/18 
14:53:13 INFO DAGScheduler: Final stage: ResultStage 640(foreachPartition at 
WriteToKafka.java:45)16/07/18 14:53:13 INFO DAGScheduler: Parents of final 
stage: List(ShuffleMapStage 518, ShuffleMapStage 639)16/07/18 14:53:23 INFO 
DAGScheduler: Missing parents: List()16/07/18 14:53:23 INFO DAGScheduler: 
Submitting ResultStage 640 (MapPartitionsRDD[271] at map at 
BuildSolrDocs.java:209), which has no missing parents16/07/18 14:53:23 INFO 
MemoryStore: ensureFreeSpace(8248) called with curMem=41923262, 
maxMem=277877882816/07/18 14:53:23 INFO MemoryStore: Block broadcast_90 stored 
as values in memory (estimated size 8.1 KB, free 2.5 GB)Exception in thread 
"dag-scheduler-event-loop" java.lang.OutOfMemoryError: Java heap space        
at 
org.apache.spark.util.io.ByteArrayChunkOutputStream.allocateNewChunkIfNeeded(ByteArrayChunkOutputStream.scala:66)
        at 
org.apache.spark.util.io.ByteArrayChunkOutputStream.write(ByteArrayChunkOutputStream.scala:55)
        at 
org.xerial.snappy.SnappyOutputStream.dumpOutput(SnappyOutputStream.java:294)    
    at org.xerial.snappy.SnappyOutputStream.flush(SnappyOutputStream.java:273)  
      at 
org.apache.spark.io.SnappyOutputStreamWrapper.flush(CompressionCodec.scala:197) 
       at 
java.io.ObjectOutputStream$BlockDataOutputStream.flush(ObjectOutputStream.java:1822)

Help needed. 

-- 
Thanks and Regards,
Saurav Sinha
Contact: 9742879062

  

Re: the spark job is so slow - almost frozen

2016-07-18 Thread Zhiliang Zhu
Thanks a lot for your reply .
In effect , here we tried to run the sql on kettle, hive and spark hive (by 
HiveContext) respectively, the job seems frozen  to finish to run .
In the 6 tables , need to respectively read the different columns in different 
tables for specific information , then do some simple calculation before output 
. join operation is used most in the sql . 
Best wishes! 

 

On Monday, July 18, 2016 6:24 PM, Chanh Le  wrote:
 

 Hi,What about the network (bandwidth) between hive and spark? Does it run in 
Hive before then you move to Spark?Because It's complex you can use something 
like EXPLAIN command to show what going on.



 
On Jul 18, 2016, at 5:20 PM, Zhiliang Zhu  wrote:
the sql logic in the program is very much complex , so do not describe the 
detailed codes   here .  

On Monday, July 18, 2016 6:04 PM, Zhiliang Zhu 
 wrote:
 

 Hi All,  
Here we have one application, it needs to extract different columns from 6 hive 
tables, and then does some easy calculation, there is around 100,000 number of 
rows in each table,finally need to output another table or file (with format of 
consistent columns) .
 However, after lots of days trying, the spark hive job is unthinkably slow - 
sometimes almost frozen. There is 5 nodes for spark cluster.  Could anyone 
offer some help, some idea or clue is also good. 
Thanks in advance~
Zhiliang 

   



  

Spark driver getting out of memory

2016-07-18 Thread Saurav Sinha
Hi,

I am running spark job.

Master memory - 5G
executor memort 10G(running on 4 node)

My job is getting killed as no of partition increase to 20K.

16/07/18 14:53:13 INFO DAGScheduler: Got job 17 (foreachPartition at
WriteToKafka.java:45) with 13524 output partitions (allowLocal=false)
16/07/18 14:53:13 INFO DAGScheduler: Final stage: ResultStage
640(foreachPartition at WriteToKafka.java:45)
16/07/18 14:53:13 INFO DAGScheduler: Parents of final stage:
List(ShuffleMapStage 518, ShuffleMapStage 639)
16/07/18 14:53:23 INFO DAGScheduler: Missing parents: List()
16/07/18 14:53:23 INFO DAGScheduler: Submitting ResultStage 640
(MapPartitionsRDD[271] at map at BuildSolrDocs.java:209), which has no
missing
parents
16/07/18 14:53:23 INFO MemoryStore: ensureFreeSpace(8248) called with
curMem=41923262, maxMem=2778778828
16/07/18 14:53:23 INFO MemoryStore: Block broadcast_90 stored as values in
memory (estimated size 8.1 KB, free 2.5 GB)
Exception in thread "dag-scheduler-event-loop" java.lang.OutOfMemoryError:
Java heap space
at
org.apache.spark.util.io.ByteArrayChunkOutputStream.allocateNewChunkIfNeeded(ByteArrayChunkOutputStream.scala:66)
at
org.apache.spark.util.io.ByteArrayChunkOutputStream.write(ByteArrayChunkOutputStream.scala:55)
at
org.xerial.snappy.SnappyOutputStream.dumpOutput(SnappyOutputStream.java:294)
at
org.xerial.snappy.SnappyOutputStream.flush(SnappyOutputStream.java:273)
at
org.apache.spark.io.SnappyOutputStreamWrapper.flush(CompressionCodec.scala:197)
at
java.io.ObjectOutputStream$BlockDataOutputStream.flush(ObjectOutputStream.java:1822)


Help needed.

-- 
Thanks and Regards,

Saurav Sinha

Contact: 9742879062


Re: the spark job is so slow - almost frozen

2016-07-18 Thread Chanh Le
Hi,
What about the network (bandwidth) between hive and spark? 
Does it run in Hive before then you move to Spark?
Because It's complex you can use something like EXPLAIN command to show what 
going on.




 
> On Jul 18, 2016, at 5:20 PM, Zhiliang Zhu  wrote:
> 
> the sql logic in the program is very much complex , so do not describe the 
> detailed codes   here . 
> 
> 
> On Monday, July 18, 2016 6:04 PM, Zhiliang Zhu  
> wrote:
> 
> 
> Hi All,  
> 
> Here we have one application, it needs to extract different columns from 6 
> hive tables, and then does some easy calculation, there is around 100,000 
> number of rows in each table,
> finally need to output another table or file (with format of consistent 
> columns) .
> 
>  However, after lots of days trying, the spark hive job is unthinkably slow - 
> sometimes almost frozen. There is 5 nodes for spark cluster. 
>  
> Could anyone offer some help, some idea or clue is also good. 
> 
> Thanks in advance~
> 
> Zhiliang 
> 
> 



Re: the spark job is so slow - almost frozen

2016-07-18 Thread Zhiliang Zhu
the sql logic in the program is very much complex , so do not describe the 
detailed codes   here .  

On Monday, July 18, 2016 6:04 PM, Zhiliang Zhu 
 wrote:
 

 Hi All,  
Here we have one application, it needs to extract different columns from 6 hive 
tables, and then does some easy calculation, there is around 100,000 number of 
rows in each table,finally need to output another table or file (with format of 
consistent columns) .
 However, after lots of days trying, the spark hive job is unthinkably slow - 
sometimes almost frozen. There is 5 nodes for spark cluster.  Could anyone 
offer some help, some idea or clue is also good. 
Thanks in advance~
Zhiliang 

  

Re: Re: how to tuning spark shuffle

2016-07-18 Thread lizhenm...@163.com
Hi,

Can you print out the environment tab on your UI.

By default spark-sql runs on local mode which means that you only have one
driver and one executor in one jvm. Do you increase the executor memory
through

SET spark.executor.memory=xG

And adjust it and run the SQL again.


HTH

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 18 July 2016 at 08:16, leezy  wrote:

> hi:
> i am run a join operation in the spark-sql, But when i increase the
> executor-memory, the run time become long. In the spark UI, i can see that
> the shuffle becomes slowly when the memory becomes big. How can i to tune
> it?
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/how-to-tuning-spark-shuffle-tp27350.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


the spark job is so slow - almost frozen

2016-07-18 Thread Zhiliang Zhu
Hi All,  
Here we have one application, it needs to extract different columns from 6 hive 
tables, and then does some easy calculation, there is around 100,000 number of 
rows in each table,finally need to output another table or file (with format of 
consistent columns) .
 However, after lots of days trying, the spark hive job is unthinkably slow - 
sometimes almost frozen. There is 5 nodes for spark cluster.  Could anyone 
offer some help, some idea or clue is also good. 
Thanks in advance~
Zhiliang 

Re: Inode for STS

2016-07-18 Thread Chanh Le
Hi Ayan,
I seem like you mention this 
https://cwiki.apache.org/confluence/display/Hive/Configuration+Properties#ConfigurationProperties-hive.start.cleanup.scratchdir
 

Default it was set false by default.





> On Jul 13, 2016, at 5:01 PM, ayan guha  wrote:
> 
> Thanks Christophe. Any comment from Spark dev community member would really 
> helpful on the Jira.
> 
> What I saw today is shutting down the thrift server process lead to a clean 
> up. Also, we started removing any empty folders from /tmp. Is there any other 
> or better method? 
> 
> On Wed, Jul 13, 2016 at 5:25 PM, Christophe Préaud 
> > wrote:
> Hi Ayan,
> 
> I have opened a JIRA about this issues, but there are no answer so far: 
> SPARK-15401 
> 
> Regards,
> Christophe.
> 
> 
> On 13/07/16 05:54, ayan guha wrote:
>> Hi
>> 
>> We are running Spark Thrift Server as a long running application. However,  
>> it looks like it is filling up /tmp/hive folder with lots of small files and 
>> directories with no file in them, blowing out inode limit and preventing any 
>> connection with "No Space Left in Device" issue. 
>> 
>> What is the best way to clean up those small files periodically? 
>> 
>> -- 
>> Best Regards,
>> Ayan Guha
> 
> 
> Kelkoo SAS
> Société par Actions Simplifiée
> Au capital de € 4.168.964,30
> Siège social : 158 Ter Rue du Temple 75003 Paris
> 425 093 069 RCS Paris
> 
> Ce message et les pièces jointes sont confidentiels et établis à l'attention 
> exclusive de leurs destinataires. Si vous n'êtes pas le destinataire de ce 
> message, merci de le détruire et d'en avertir l'expéditeur.
> 
> 
> 
> -- 
> Best Regards,
> Ayan Guha



pyspark 1.5 0 save model ?

2016-07-18 Thread pseudo oduesp
Hi,
how i can save model under pyspakr 1.5.0  ?
 i use RandomForestClassifier()
thanks in advance.


Re: how to tuning spark shuffle

2016-07-18 Thread Mich Talebzadeh
Hi,

Can you print out the environment tab on your UI.

By default spark-sql runs on local mode which means that you only have one
driver and one executor in one jvm. Do you increase the executor memory
through

SET spark.executor.memory=xG

And adjust it and run the SQL again.


HTH

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 18 July 2016 at 08:16, leezy  wrote:

> hi:
> i am run a join operation in the spark-sql, But when i increase the
> executor-memory, the run time become long. In the spark UI, i can see that
> the shuffle becomes slowly when the memory becomes big. How can i to tune
> it?
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/how-to-tuning-spark-shuffle-tp27350.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Dynamically get value based on Map key in Spark Dataframe

2016-07-18 Thread Divya Gehlot
Hi,

I have created a map by reading a text file
val keyValueMap = file_read.map(t => t.getString(0) ->
t.getString(4)).collect().toMap

Now I have another dataframe where I need to dynamically replace all the
keys of Map with values
val df_input = reading the file as dataframe
val df_replacekeys =
df_input.withColumn("map_values",lit(keyValueMap (col("key"

Would really appreciate the help .


Thanks,
Divya


how to tuning spark shuffle

2016-07-18 Thread leezy
hi:
i am run a join operation in the spark-sql, But when i increase the
executor-memory, the run time become long. In the spark UI, i can see that
the shuffle becomes slowly when the memory becomes big. How can i to tune
it?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/how-to-tuning-spark-shuffle-tp27350.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark Job trigger in production

2016-07-18 Thread Jagat Singh
You can use following options

* spark-submit from shell
* some kind of job server. See spark-jobserver for details
* some notebook environment See Zeppelin for example





On 18 July 2016 at 17:13, manish jaiswal  wrote:

> Hi,
>
>
> What is the best approach to trigger spark job in production cluster?
>


Spark Job trigger in production

2016-07-18 Thread manish jaiswal
Hi,


What is the best approach to trigger spark job in production cluster?


Question About OFF_HEAP Caching

2016-07-18 Thread condor join
Hi All,

I have some questions about OFF_HEAP Caching. In Spark 1.X when we use 
rdd.persist(StorageLevel.OFF_HEAP),that means rdd caching in Tachyon(Alluxio). 
However,in Spark 2.X,we can directly use OFF_HEAP  For Caching

(https://issues.apache.org/jira/browse/SPARK-13992?jql=project%20%3D%20SPARK%20AND%20text%20~%20%22off-heap%20caching%22).
 I am confuse about this and I have follow questions:

1.In Spark 2.X, how should we use Tachyon for caching?

2.Is there any reason that must change in this way(I mean use off_heap directly 
instead of using Tachyon)

Thanks a lot!