Re: Pretty print a dataframe...

2017-02-16 Thread Muthu Jayakumar
This worked. Thanks for the tip Michael.

Thanks,
Muthu

On Thu, Feb 16, 2017 at 12:41 PM, Michael Armbrust 
wrote:

> The toString method of Dataset.queryExecution includes the various plans.
> I usually just log that directly.
>
> On Thu, Feb 16, 2017 at 8:26 AM, Muthu Jayakumar 
> wrote:
>
>> Hello there,
>>
>> I am trying to write to log-line a dataframe/dataset queryExecution
>> and/or its logical plan. The current code...
>>
>> def explain(extended: Boolean): Unit = {
>>   val explain = ExplainCommand(queryExecution.logical, extended = extended)
>>   
>> sparkSession.sessionState.executePlan(explain).executedPlan.executeCollect().foreach
>>  {
>> // scalastyle:off println
>> r => println(r.getString(0))
>> // scalastyle:on println
>>   }
>> }
>>
>> sessionState is not accessible if I were to write my own explain(log:
>> LoggingAdapter).
>>
>> Please advice,
>> Muthu
>>
>
>


Spark Worker can't find jar submitted programmatically

2017-02-16 Thread jeremycod
Hi,I'm trying to create application that would programmatically submit jar
file to Spark standalone cluster running on my local PC. However, I'm always
getting the error WARN  TaskSetManager:66 - Lost task 1.0 in stage 0.0 (TID
1, 192.168.2.68, executor 0): java.lang.RuntimeException: Stream
'/jars/sample-spark-maven-one-jar.jar' was not found.I'm creating the
SparkContext in the following way:val sparkConf = new SparkConf() 
sparkConf.setMaster("spark://zoran-Latitude-E5420:7077") 
sparkConf.set("spark.cores_max","2") 
sparkConf.set("spark.executor.memory","2g") 
sparkConf.set("spark.serializer",
"org.apache.spark.serializer.KryoSerializer")  sparkConf.setAppName("Test
application")  sparkConf.set("spark.ui.port","4041") 
sparkConf.set("spark.local.ip","192.168.2.68")  val
oneJar="/samplesparkmaven/target/sample-spark-maven-one-jar.jar" 
sparkConf.setJars(List(oneJar))  val sc = new SparkContext(sparkConf)I'm
using Spark 2.1.0 in standalone mode with master and one worker. Does anyone
have idea where the problem might be or how to investigate it
further?Thanks,Zoran



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Worker-can-t-find-jar-submitted-programmatically-tp28398.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: Can't load a RandomForestClassificationModel in Spark job

2017-02-16 Thread Russell Jurney
When you say workers, are you using Spark Streaming? I'm not sure if this
will help, but there is an example of deploying a
RandomForestClassificationModel in Spark Streaming against Kafka that uses
createDataFrame here:
https://github.com/rjurney/Agile_Data_Code_2/blob/master/ch08/make_predictions_streaming.py


I had to create a pyspark.sql.Row in a map operation in an RDD before I
call spark.createDataFrame. Check out lines 92-138.

Not sure if this helps, but I thought I'd give it a try ;)

---
Russell Jurney @rjurney 
russell.jur...@gmail.com LI  FB
 datasyndrome.com

On Tue, Feb 14, 2017 at 4:46 PM, Jianhong Xia  wrote:

> Is there any update on this problem?
>
>
>
> I encountered the same issue that was mentioned here.
>
>
>
> I have CrossValidatorModel.transform(df) running on workers, which
> requires DataFrame as an input. However, we only have Arrays on workers.
> When we deploy our model into cluster mode, we could not create
> createDataFrame on workers. It will give me error:
>
>
>
>
>
> 17/02/13 20:21:27 ERROR Detector$: Error while detecting threats
>
> java.lang.NullPointerException
>
>  at org.apache.spark.sql.SparkSession.sessionState$
> lzycompute(SparkSession.scala:111)
>
>  at org.apache.spark.sql.SparkSession.sessionState(
> SparkSession.scala:109)
>
>  at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:62)
>
>  at org.apache.spark.sql.SparkSession.createDataFrame(
> SparkSession.scala:270)
>
>  at com.mycompany.analytics.models.app.serializable.
> AppModeler.detection(modeler.scala:370)
>
>
>
>
>
>
>
> On the other hand, if we run in the local, everything works fine.
>
>
>
> Just want to know, if there is any successful case that run machine
> learning models on the workers.
>
>
>
>
>
> Thanks,
>
> Jianhong
>
>
>
>
>
> *From:* Sumona Routh [mailto:sumos...@gmail.com]
> *Sent:* Thursday, January 12, 2017 6:20 PM
> *To:* ayan guha ; user@spark.apache.org
> *Subject:* Re: Can't load a RandomForestClassificationModel in Spark job
>
>
>
> Yes, I save it to S3 in a different process. It is actually the
> RandomForestClassificationModel.load method (passed an s3 path) where I
> run into problems.
> When you say you load it during map stages, do you mean that you are able
> to directly load a model from inside of a transformation? When I try this,
> it passes the function to a worker, and the load method itself appears to
> attempt to create a new SparkContext, which causes an NPE downstream
> (because creating a SparkContext on the worker is not an appropriate thing
> to do, according to various threads I've found).
>
> Maybe there is a different load function I should be using?
>
> Thanks!
>
> Sumona
>
>
>
> On Thu, Jan 12, 2017 at 6:26 PM ayan guha  wrote:
>
> Hi
>
>
>
> Given training and predictions are two different applications, I typically
> save model objects to hdfs and load it back during prediction map stages.
>
>
>
> Best
>
> Ayan
>
>
>
> On Fri, 13 Jan 2017 at 5:39 am, Sumona Routh  wrote:
>
> Hi all,
>
> I've been working with Spark mllib 2.0.2 RandomForestClassificationModel.
>
> I encountered two frustrating issues and would really appreciate some
> advice:
>
> 1)  RandomForestClassificationModel is effectively not serializable (I
> assume it's referencing something that can't be serialized, since it itself
> extends serializable), so I ended up with the well-known exception:
> org.apache.spark.SparkException: Task not serializable.
> Basically, my original intention was to pass the model as a parameter
>
> because which model we use is dynamic based on what record we are
>
> predicting on.
>
> Has anyone else encountered this? Is this currently being addressed? I
> would expect objects from Spark's own libraries be able to be used
> seamlessly in their applications without these types of exceptions.
>
> 2) The RandomForestClassificationModel.load method appears to hang
> indefinitely when executed from inside a map function (which I assume is
> passed to the executor). So, I basically cannot load a model from a worker.
> We have multiple "profiles" that use differently trained models, which are
> accessed from within a map function to run predictions on different sets of
> data.
>
> The thread that is hanging has this as the latest (most pertinent) code:
> org.apache.spark.ml.util.DefaultParamsReader$.
> loadMetadata(ReadWrite.scala:391)
>
> Looking at the code in github, it appears that it is calling sc.textFile.
> I could not find anything stating that this particular function would not
> work from within a map function.
>
> Are there any suggestions as to how I can get this model to work on a real
> production job (either by allowing it to be serializable and passed around
> or loaded from a worker)?
>
> I've extenisvely POCed this model (saving, loading, transforming,
> 

RE: Can't load a RandomForestClassificationModel in Spark job

2017-02-16 Thread Jianhong Xia
Thanks Hollin.

I will take a look at mleap and will let you know if I have any questions.

Jianhong


From: Hollin Wilkins [mailto:hol...@combust.ml]
Sent: Tuesday, February 14, 2017 11:48 PM
To: Jianhong Xia 
Cc: Sumona Routh ; ayan guha ; 
user@spark.apache.org
Subject: Re: Can't load a RandomForestClassificationModel in Spark job

Hey there,

Creating a new SparkContext on workers will not work, only the driver is 
allowed to own a SparkContext. Are you trying to distribute your model to 
workers so you can create a distributed scoring service? If so, it may be worth 
looking into taking your models outside of a SparkContext and serving them 
separately.

If this is your use case, take a look at MLeap. We use it in production to 
serve high-volume realtime requests from Spark-trained models: 
https://github.com/combust/mleap

Cheers,
Hollin

On Tue, Feb 14, 2017 at 4:46 PM, Jianhong Xia 
> wrote:
Is there any update on this problem?

I encountered the same issue that was mentioned here.

I have CrossValidatorModel.transform(df) running on workers, which requires 
DataFrame as an input. However, we only have Arrays on workers. When we deploy 
our model into cluster mode, we could not create createDataFrame on workers. It 
will give me error:


17/02/13 20:21:27 ERROR Detector$: Error while detecting threats
java.lang.NullPointerException
 at 
org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:111)
 at org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:109)
 at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:62)
 at 
org.apache.spark.sql.SparkSession.createDataFrame(SparkSession.scala:270)
 at 
com.mycompany.analytics.models.app.serializable.AppModeler.detection(modeler.scala:370)



On the other hand, if we run in the local, everything works fine.

Just want to know, if there is any successful case that run machine learning 
models on the workers.


Thanks,
Jianhong


From: Sumona Routh [mailto:sumos...@gmail.com]
Sent: Thursday, January 12, 2017 6:20 PM
To: ayan guha >; 
user@spark.apache.org
Subject: Re: Can't load a RandomForestClassificationModel in Spark job

Yes, I save it to S3 in a different process. It is actually the 
RandomForestClassificationModel.load method (passed an s3 path) where I run 
into problems.
When you say you load it during map stages, do you mean that you are able to 
directly load a model from inside of a transformation? When I try this, it 
passes the function to a worker, and the load method itself appears to attempt 
to create a new SparkContext, which causes an NPE downstream (because creating 
a SparkContext on the worker is not an appropriate thing to do, according to 
various threads I've found).
Maybe there is a different load function I should be using?
Thanks!
Sumona

On Thu, Jan 12, 2017 at 6:26 PM ayan guha 
> wrote:
Hi

Given training and predictions are two different applications, I typically save 
model objects to hdfs and load it back during prediction map stages.

Best
Ayan

On Fri, 13 Jan 2017 at 5:39 am, Sumona Routh 
> wrote:
Hi all,
I've been working with Spark mllib 2.0.2 RandomForestClassificationModel.
I encountered two frustrating issues and would really appreciate some advice:

1)  RandomForestClassificationModel is effectively not serializable (I assume 
it's referencing something that can't be serialized, since it itself extends 
serializable), so I ended up with the well-known exception: 
org.apache.spark.SparkException: Task not serializable.
Basically, my original intention was to pass the model as a parameter

because which model we use is dynamic based on what record we are

predicting on.
Has anyone else encountered this? Is this currently being addressed? I would 
expect objects from Spark's own libraries be able to be used seamlessly in 
their applications without these types of exceptions.
2) The RandomForestClassificationModel.load method appears to hang indefinitely 
when executed from inside a map function (which I assume is passed to the 
executor). So, I basically cannot load a model from a worker. We have multiple 
"profiles" that use differently trained models, which are accessed from within 
a map function to run predictions on different sets of data.
The thread that is hanging has this as the latest (most pertinent) code:
org.apache.spark.ml.util.DefaultParamsReader$.loadMetadata(ReadWrite.scala:391)
Looking at the code in github, it appears that it is calling sc.textFile. I 
could not find anything stating that this particular function would not work 
from within a map function.
Are there any suggestions as to how I can get this model to work on a real 
production job 

Re: Remove .HiveStaging files

2017-02-16 Thread Xiao Li
Maybe you can check this PR?

https://github.com/apache/spark/pull/16399

Thanks,

Xiao


2017-02-15 15:05 GMT-08:00 KhajaAsmath Mohammed :

> Hi,
>
> I am using spark temporary tables to write data back to hive. I have seen
> weird behavior of .hive-staging files after job completion. does anyone
> know how to delete them or dont get created while writing data into hive.
>
> Thanks,
> Asmath
>


Spark standalone cluster on EC2 error .. Checkpoint..

2017-02-16 Thread shyla deshpande
Caused by: org.apache.hadoop.ipc.RemoteException(java.io.IOException): File
/checkpoint/11ea8862-122c-4614-bc7e-f761bb57ba23/rdd-347/.part-1-attempt-3
could only be replicated to 0 nodes instead of minReplication (=1).  There
are 0 datanode(s) running and no node(s) are excluded in this operation.

This is the error I get when I run my spark streaming app on 2 node EC2
cluster, with 1 master and 1 worker.

Works fine in local mode. Please help.

Thanks


Re: Debugging Spark application

2017-02-16 Thread Md. Rezaul Karim
Thanks, Sam. I will have a look at it.

On Feb 16, 2017 10:06 PM, "Sam Elamin"  wrote:

> I recommend running spark in local mode when your first debugging your
> code just to understand what's happening and step through it, perhaps catch
> a few errors when you first start off
>
> I personally use intellij because it's my preference You can follow this
> guide.
> http://www.bigendiandata.com/2016-08-26-How-to-debug-
> remote-spark-jobs-with-IntelliJ/
>
> Although it's for intellij you can apply the same concepts to eclipse *I
> think*
>
>
> Regards
> Sam
>
>
> On Thu, 16 Feb 2017 at 22:00, Md. Rezaul Karim <
> rezaul.ka...@insight-centre.org> wrote:
>
>> Hi,
>>
>> I was looking for some URLs/documents for getting started on debugging
>> Spark applications.
>>
>> I prefer developing Spark applications with Scala on Eclipse and then
>> package the application jar before submitting.
>>
>>
>>
>> Kind regards,
>> Reza
>>
>>
>>
>>
>


Re: Debugging Spark application

2017-02-16 Thread Sam Elamin
I recommend running spark in local mode when your first debugging your code
just to understand what's happening and step through it, perhaps catch a
few errors when you first start off

I personally use intellij because it's my preference You can follow this
guide.
http://www.bigendiandata.com/2016-08-26-How-to-debug-remote-spark-jobs-with-IntelliJ/

Although it's for intellij you can apply the same concepts to eclipse *I
think*


Regards
Sam


On Thu, 16 Feb 2017 at 22:00, Md. Rezaul Karim <
rezaul.ka...@insight-centre.org> wrote:

> Hi,
>
> I was looking for some URLs/documents for getting started on debugging
> Spark applications.
>
> I prefer developing Spark applications with Scala on Eclipse and then
> package the application jar before submitting.
>
>
>
> Kind regards,
> Reza
>
>
>
>


Debugging Spark application

2017-02-16 Thread Md. Rezaul Karim
Hi,

I was looking for some URLs/documents for getting started on debugging
Spark applications.

I prefer developing Spark applications with Scala on Eclipse and then
package the application jar before submitting.



Kind regards,
Reza


Re: Pretty print a dataframe...

2017-02-16 Thread Michael Armbrust
The toString method of Dataset.queryExecution includes the various plans.
I usually just log that directly.

On Thu, Feb 16, 2017 at 8:26 AM, Muthu Jayakumar  wrote:

> Hello there,
>
> I am trying to write to log-line a dataframe/dataset queryExecution and/or
> its logical plan. The current code...
>
> def explain(extended: Boolean): Unit = {
>   val explain = ExplainCommand(queryExecution.logical, extended = extended)
>   
> sparkSession.sessionState.executePlan(explain).executedPlan.executeCollect().foreach
>  {
> // scalastyle:off println
> r => println(r.getString(0))
> // scalastyle:on println
>   }
> }
>
> sessionState is not accessible if I were to write my own explain(log:
> LoggingAdapter).
>
> Please advice,
> Muthu
>


Spark on Mesos with Docker in bridge networking mode

2017-02-16 Thread cherryii
I'm getting errors when I try to run my docker container in bridge networking
mode on mesos. 
Here is my spark submit script

/spark/bin/spark-submit \
 --class com.package.MySparkJob \
 --name My-Spark-Job \
 --files /path/config.cfg, ${JAR} \
 --master ${SPARK_MASTER_HOST} \
 --deploy-mode client \
 --supervise \
 --total-executor-cores ${SPARK_EXECUTOR_TOTAL_CORES} \
 --driver-cores ${SPARK_DRIVER_CORES} \
 --driver-memory ${SPARK_DRIVER_MEMORY} \
 --num-executors ${SPARK_NUM_EXECUTORS} \
 --executor-cores ${SPARK_EXECUTOR_CORES} \
 --executor-memory ${SPARK_EXECUTOR_MEMORY} \
 --driver-class-path ${JAR} \
 --conf
"spark.mesos.executor.docker.image=${SPARK_MESOS_EXECUTOR_DOCKER_IMAGE}" \
 --conf
"spark.mesos.executor.docker.volumes=${SPARK_MESOS_EXECUTOR_DOCKER_VOLUMES}"
\
 --conf "spark.mesos.uris=${SPARK_MESOS_URIS}" \
 --conf "spark.executorEnv.OBERON_DB_PASS=${OBERON_DB_PASS}" \
 --conf "spark.executorEnv.S3_SECRET_ACCESS_KEY=${S3_SECRET_ACCESS_KEY}" \
 --conf "spark.executorEnv.S3_ACCESS_KEY=${S3_ACCESS_KEY}" \
 --conf "spark.mesos.executor.home=${SPARK_HOME}" \
 --conf "spark.executorEnv.MESOS_NATIVE_JAVA_LIBRARY=${SPARK_MESOS_LIB}" \
 --conf "spark.files.overwrite=true" \
 --conf "spark.shuffle.service.enabled=false" \
 --conf "spark.dynamicAllocation.enabled=false" \
 --conf "spark.ui.port=${PORT_SPARKUI}" \
 --conf "spark.driver.host=${SPARK_PUBLIC_DNS}" \
 --conf "spark.driver.port=${PORT_SPARKDRIVER}" \
 --conf "spark.driver.blockManager.port=${PORT_SPARKBLOCKMANAGER}" \
 --conf "spark.jars=${JAR}" \
 --conf "spark.executor.extraClassPath=${JAR}" \
 ${JAR} 

Here is the error I'm seeing: 
java.net.BindException: Cannot assign requested address: Service
'sparkDriver' failed after 16 retries! Consider explicitly setting the
appropriate port for the service 'sparkDriver' (for example spark.ui.port
for SparkUI) to an available port or increasing spark.port.maxRetries.
at sun.nio.ch.Net.bind0(Native Method)
at sun.nio.ch.Net.bind(Net.java:433)
at sun.nio.ch.Net.bind(Net.java:425)
at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:223)
at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:74)
at
io.netty.channel.socket.nio.NioServerSocketChannel.doBind(NioServerSocketChannel.java:125)
at
io.netty.channel.AbstractChannel$AbstractUnsafe.bind(AbstractChannel.java:485)
at
io.netty.channel.DefaultChannelPipeline$HeadContext.bind(DefaultChannelPipeline.java:1089)
at
io.netty.channel.AbstractChannelHandlerContext.invokeBind(AbstractChannelHandlerContext.java:430)
at
io.netty.channel.AbstractChannelHandlerContext.bind(AbstractChannelHandlerContext.java:415)
at
io.netty.channel.DefaultChannelPipeline.bind(DefaultChannelPipeline.java:903)
at io.netty.channel.AbstractChannel.bind(AbstractChannel.java:198)
at io.netty.bootstrap.AbstractBootstrap$2.run(AbstractBootstrap.java:348)
at
io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357)
at
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
at java.lang.Thread.run(Thread.java:745)

I was trying to follow instructions here:
https://github.com/apache/spark/pull/15120
So in my Marathon json I'm defining the ports to use for the spark driver,
spark ui and block manager.

Can anyone help me get this running in bridge networking mode?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-on-Mesos-with-Docker-in-bridge-networking-mode-tp28397.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Latent Dirichlet Allocation in Spark

2017-02-16 Thread Manish Tripathi
Hi

I am trying to do topic modeling in Spark using Spark's LDA package. Using
Spark 2.0.2 and pyspark API.

I ran the code as below:

*from pyspark.ml.clustering import LDA*
*lda = LDA(featuresCol="tf_features",k=10, seed=1, optimizer="online")*
*ldaModel=lda.fit(tf_df)*

*lda_df=ldaModel.transform(tf_df)*

I went through the docs to understand the output (the form of data) Spark
generates for LDA.

I understand the ldaModel.describeTopics() method. Gives topics with list
of terms and weights.

But I am not sure I understand the method ldamodel.topicsMatrix().

It gives me this:


​

if the doc says it is the distribution of words for each topic (1184 words
as rows, 10 topics as columns and the values of these cells. But then these
values are not probabilities which is what one would expect for topic-word
distribution.

These have random values more than 1 (132.76, 3.00 and so on).

Any jdea on this?

Thanks
ᐧ


Will Spark ever run the same task at the same time

2017-02-16 Thread Ji Yan
Dear spark users,

Is there any mechanism in Spark that does not guarantee the idempotent
nature? For example, for stranglers, the framework might start another task
assuming the strangler is slow while the strangler is still running. This
would be annoying sometime when say the task is writing to a file, but have
the same tasks running at the same time may corrupt the file. From the
documentation page, I know that Spark's speculative execution mode is
turned off by default. Does anyone know any other mechanism in Spark that
may cause problem in scenario like this?

Thanks
Ji

-- 
 

The information in this email is confidential and may be legally 
privileged. It is intended solely for the addressee. Access to this email 
by anyone else is unauthorized. If you are not the intended recipient, any 
disclosure, copying, distribution or any action taken or omitted to be 
taken in reliance on it, is prohibited and may be unlawful.


Re: skewed data in join

2017-02-16 Thread Gourav Sengupta
Hi,

Thanks for your kind response. The hash key using random numbers increases
the time for processing the data. My entire join for the entire month
finishes within 150 seconds for 471 million records and then stays for
another 6 mins for 55 million records.

Using hash keys increases the processing time to 11 mins. Therefore I am
not quite clear why should I do that. The overall idea was to ensure that
the entire processing of around 520 million records in may be another 10
seconds more.



Regards,
Gourav Sengupta

On Thu, Feb 16, 2017 at 4:54 PM, Anis Nasir  wrote:

> You can also so something similar to what is mentioned in [1].
>
> The basic idea is to use two hash functions for each key and assigning it
> to the least loaded of the two hashed worker.
>
> Cheers,
> Anis
>
>
> [1]. https://melmeric.files.wordpress.com/2014/11/the-
> power-of-both-choices-practical-load-balancing-for-
> distributed-stream-processing-engines.pdf
>
>
> On Fri, 17 Feb 2017 at 01:34, Yong Zhang  wrote:
>
>> Yes. You have to change your key, or as BigData term, "adding salt".
>>
>>
>> Yong
>>
>> --
>> *From:* Gourav Sengupta 
>> *Sent:* Thursday, February 16, 2017 11:11 AM
>> *To:* user
>> *Subject:* skewed data in join
>>
>> Hi,
>>
>> Is there a way to do multiple reducers for joining on skewed data?
>>
>> Regards,
>> Gourav
>>
>


scala.io.Source.fromFile protocol for hadoop

2017-02-16 Thread nancy henry
Hello,



hiveSqlContext.sql(scala.io.Source.fromFile(args(0).toString()).mkString).collect()

I have a file in my local system

and i am spark-submit deploy mode cluster  on hadoop

so args(0) should be on hadoop cluster or local?

what should be the protocol file:///
for hadoop what is the protocol?


Re: skewed data in join

2017-02-16 Thread Anis Nasir
You can also so something similar to what is mentioned in [1].

The basic idea is to use two hash functions for each key and assigning it
to the least loaded of the two hashed worker.

Cheers,
Anis


[1].
https://melmeric.files.wordpress.com/2014/11/the-power-of-both-choices-practical-load-balancing-for-distributed-stream-processing-engines.pdf


On Fri, 17 Feb 2017 at 01:34, Yong Zhang  wrote:

> Yes. You have to change your key, or as BigData term, "adding salt".
>
>
> Yong
>
> --
> *From:* Gourav Sengupta 
> *Sent:* Thursday, February 16, 2017 11:11 AM
> *To:* user
> *Subject:* skewed data in join
>
> Hi,
>
> Is there a way to do multiple reducers for joining on skewed data?
>
> Regards,
> Gourav
>


Re: skewed data in join

2017-02-16 Thread Yong Zhang
Yes. You have to change your key, or as BigData term, "adding salt".


Yong


From: Gourav Sengupta 
Sent: Thursday, February 16, 2017 11:11 AM
To: user
Subject: skewed data in join

Hi,

Is there a way to do multiple reducers for joining on skewed data?

Regards,
Gourav


Pretty print a dataframe...

2017-02-16 Thread Muthu Jayakumar
Hello there,

I am trying to write to log-line a dataframe/dataset queryExecution and/or
its logical plan. The current code...

def explain(extended: Boolean): Unit = {
  val explain = ExplainCommand(queryExecution.logical, extended = extended)
  
sparkSession.sessionState.executePlan(explain).executedPlan.executeCollect().foreach
{
// scalastyle:off println
r => println(r.getString(0))
// scalastyle:on println
  }
}

sessionState is not accessible if I were to write my own explain(log:
LoggingAdapter).

Please advice,
Muthu


skewed data in join

2017-02-16 Thread Gourav Sengupta
Hi,

Is there a way to do multiple reducers for joining on skewed data?

Regards,
Gourav


Pyspark: out of memory exception during model training

2017-02-16 Thread mzaharchenko
My problem is quite simple - JVM is running out of memory during model =
dt.fit(train_small). My train_small dataset contains only 100 rows(I have
limited the number of rows to make sure the size of dataset doesn't cause
the memory overflow). But each row has a column all_features with a long
vector(300+ entries). 
Could this be the source of the OOM error? 

Here is my code: 

 dt = DecisionTreeRegressor(featuresCol="all_features", labelCol="rating",
predictionCol="prediction")
model = dt.fit(train_small)
predictions = model.transform(test_small)
evaluator = RegressionEvaluator(labelCol="rating",  
predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

Stacktrace:

Py4JJavaError: An error occurred while calling o5719.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 13
in stage 8821.0 failed 1 times, most recent failure: Lost task 13.0 in stage
8821.0 (TID 83708, localhost, executor driver): java.lang.OutOfMemoryError:
Java heap space
at java.nio.HeapByteBuffer.(HeapByteBuffer.java:57)
at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
at
org.apache.spark.sql.execution.columnar.NullableColumnBuilder$class.build(NullableColumnBuilder.scala:74)
at
org.apache.spark.sql.execution.columnar.ComplexColumnBuilder.build(ColumnBuilder.scala:91)
at
org.apache.spark.sql.execution.columnar.InMemoryRelation$$anonfun$1$$anon$1$$anonfun$next$2.apply(InMemoryRelation.scala:134)
at
org.apache.spark.sql.execution.columnar.InMemoryRelation$$anonfun$1$$anon$1$$anonfun$next$2.apply(InMemoryRelation.scala:133)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
at
org.apache.spark.sql.execution.columnar.InMemoryRelation$$anonfun$1$$anon$1.next(InMemoryRelation.scala:133)
at
org.apache.spark.sql.execution.columnar.InMemoryRelation$$anonfun$1$$anon$1.next(InMemoryRelation.scala:97)
at
org.apache.spark.storage.memory.PartiallyUnrolledIterator.next(MemoryStore.scala:706)
at
org.apache.spark.serializer.SerializationStream.writeAll(Serializer.scala:140)
at
org.apache.spark.serializer.SerializerManager.dataSerializeStream(SerializerManager.scala:170)
at
org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1$$anonfun$apply$5.apply(BlockManager.scala:964)
at
org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1$$anonfun$apply$5.apply(BlockManager.scala:963)
at org.apache.spark.storage.DiskStore.put(DiskStore.scala:57)
at
org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:963)
at
org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:947)
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:887)
at
org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:947)
at
org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:693)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:285)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)

Driver stacktrace:
at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1456)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1444)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1443)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1443)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
at scala.Option.foreach(Option.scala:257)
at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1671)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1626)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1615)
at 

Re: Enrichment with static tables

2017-02-16 Thread Gaurav Agarwal
Thanks That worked for me previously I was using wrong join .that the
reason it did Not worked for me

Tbanks

On Feb 16, 2017 01:20, "Sam Elamin"  wrote:

> You can do a join or a union to combine all the dataframes to one fat
> dataframe
>
> or do a select on the columns you want to produce your transformed
> dataframe
>
> Not sure if I understand the question though, If the goal is just an end
> state transformed dataframe that can easily be done
>
>
> Regards
> Sam
>
> On Wed, Feb 15, 2017 at 6:34 PM, Gaurav Agarwal 
> wrote:
>
>> Hello
>>
>> We want to enrich our spark RDD loaded with multiple Columns and multiple
>> Rows . This need to be enriched with 3 different tables that i loaded 3
>> different spark dataframe . Can we write some logic in spark so i can
>> enrich my spark RDD with different stattic tables.
>>
>> Thanks
>>
>>
>