Re: spark source Intellij

2016-01-15 Thread Ted Yu
See:

http://search-hadoop.com/m/q3RTtZbuxxp9p6N1=Re+Best+IDE+Configuration

> On Jan 15, 2016, at 2:19 AM, Sanjeev Verma  wrote:
> 
> I want to configure spark source code into Intellij IDEA Is there any 
> document available / known steps which can guide me to configure spark 
> project in to the Intellij IDEA. 
> 
> Any help will be appreciated
> Thanks

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: NPE when using Joda DateTime

2016-01-15 Thread Romain Sagean
Hi, I had a similar problem with Joda Time though i didn't use Kryo, the
solution I found was to use standard java date and time classes instead of
Joda.

2016-01-15 13:16 GMT+01:00 Sean Owen :

> I haven't dug into this, but I agree that something that is transient
> isn't meant to be restored by the default Java serialization
> mechanism. I'd expect the class handles restoring that value as needed
> or in a custom readObject method. And then I don't know how Kryo
> interacts with that.
>
> I don't think you need to install anything. You may end up writing
> your own serialization for Kryo.
>
> Try not using Kryo just to narrow it down?
>
> Hackier solution: send around long timestamps and then make them into
> DateTime locally as needed. Not great.
>
> Or if possible use Java 8, where Joda APIs are part of the JDK.
> Possibly it works then.
>
> On Fri, Jan 15, 2016 at 9:32 AM, Spencer, Alex (Santander)
>  wrote:
> > Hi,
> >
> >
> >
> > I tried Zhu’s recommendation and sadly got the same error. (Again, single
> > map worked by the groupBy / flatMap generates this error).
> >
> >
> >
> > Does Kryo has a bug i.e. it’s not serialising all components needed, or
> do
> > I just need to get our IT team to install those magro Serializers as
> > suggested by Todd? If that variable is transient then actually that means
> > Kryo is working as it’s meant to?
> >
> >
> >
> > Am I at the point where I should pull apart the source code and build my
> own
> > DateTime class? I hate reinventing the wheel though.
> >
> >
> >
> > Thanks,
> >
> > Alex.
> >
> >
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 
*Romain Sagean*

*romain.sag...@hupi.fr *


Re: AIC in Linear Regression in ml pipeline

2016-01-15 Thread Yanbo Liang
Hi Arunkumar,

It does not support output AIC value for Linear Regression currently. This
feature is under development and will be released at Spark 2.0.

Thanks
Yanbo

2016-01-15 17:20 GMT+08:00 Arunkumar Pillai :

> Hi
>
> Is it possible to get AIC value in Linear Regression using ml pipeline ?
> Is so please help me
>
> --
> Thanks and Regards
> Arun
>


Re: NPE when using Joda DateTime

2016-01-15 Thread Sean Owen
I haven't dug into this, but I agree that something that is transient
isn't meant to be restored by the default Java serialization
mechanism. I'd expect the class handles restoring that value as needed
or in a custom readObject method. And then I don't know how Kryo
interacts with that.

I don't think you need to install anything. You may end up writing
your own serialization for Kryo.

Try not using Kryo just to narrow it down?

Hackier solution: send around long timestamps and then make them into
DateTime locally as needed. Not great.

Or if possible use Java 8, where Joda APIs are part of the JDK.
Possibly it works then.

On Fri, Jan 15, 2016 at 9:32 AM, Spencer, Alex (Santander)
 wrote:
> Hi,
>
>
>
> I tried Zhu’s recommendation and sadly got the same error. (Again, single
> map worked by the groupBy / flatMap generates this error).
>
>
>
> Does Kryo has a bug i.e. it’s not serialising all components needed, or do
> I just need to get our IT team to install those magro Serializers as
> suggested by Todd? If that variable is transient then actually that means
> Kryo is working as it’s meant to?
>
>
>
> Am I at the point where I should pull apart the source code and build my own
> DateTime class? I hate reinventing the wheel though.
>
>
>
> Thanks,
>
> Alex.
>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: NPE when using Joda DateTime

2016-01-15 Thread Spencer, Alex (Santander)
I'll try the hackier way for now - given the limitation of not being able to 
modify the environment we've been given.
Thanks all for your help so far.

Kind Regards,
Alex.
 
-Original Message-
From: Sean Owen [mailto:so...@cloudera.com] 
Sent: 15 January 2016 12:17
To: Spencer, Alex (Santander)
Cc: Shixiong(Ryan) Zhu; user@spark.apache.org
Subject: Re: NPE when using Joda DateTime

I haven't dug into this, but I agree that something that is transient isn't 
meant to be restored by the default Java serialization mechanism. I'd expect 
the class handles restoring that value as needed or in a custom readObject 
method. And then I don't know how Kryo interacts with that.

I don't think you need to install anything. You may end up writing your own 
serialization for Kryo.

Try not using Kryo just to narrow it down?

Hackier solution: send around long timestamps and then make them into DateTime 
locally as needed. Not great.

Or if possible use Java 8, where Joda APIs are part of the JDK.
Possibly it works then.

On Fri, Jan 15, 2016 at 9:32 AM, Spencer, Alex (Santander) 
 wrote:
> Hi,
>
>
>
> I tried Zhu’s recommendation and sadly got the same error. (Again, 
> single map worked by the groupBy / flatMap generates this error).
>
>
>
> Does Kryo has a bug i.e. it’s not serialising all components needed, 
> or do I just need to get our IT team to install those magro 
> Serializers as suggested by Todd? If that variable is transient then 
> actually that means Kryo is working as it’s meant to?
>
>
>
> Am I at the point where I should pull apart the source code and build 
> my own DateTime class? I hate reinventing the wheel though.
>
>
>
> Thanks,
>
> Alex.
>
>
Emails aren't always secure, and they may be intercepted or changed after
they've been sent. Santander doesn't accept liability if this happens. If you
think someone may have interfered with this email, please get in touch with the
sender another way. This message doesn't create or change any contract.
Santander doesn't accept responsibility for damage caused by any viruses
contained in this email or its attachments. Emails may be monitored. If you've
received this email by mistake, please let the sender know at once that it's
gone to the wrong person and then destroy it without copying, using, or telling
anyone about its contents.
Santander UK plc Reg. No. 2294747 and Abbey National Treasury Services plc Reg.
No. 2338548 Registered Offices: 2 Triton Square, Regent's Place, London NW1 3AN.
Registered in England. www.santander.co.uk. Authorised by the Prudential
Regulation Authority and regulated by the Financial Conduct Authority and the
Prudential Regulation Authority. FCA Reg. No. 106054 and 146003 respectively.
Santander Sharedealing is a trading name of Abbey Stockbrokers Limited Reg. No.
02666793. Registered Office: Kingfisher House, Radford Way, Billericay, Essex
CM12 0GZ. Authorised and regulated by the Financial Conduct Authority. FCA Reg.
No. 154210. You can check this on the Financial Services Register by visiting
the FCA’s website www.fca.org.uk/register or by contacting the FCA on 0800 111
6768. Santander UK plc is also licensed by the Financial Supervision Commission
of the Isle of Man for its branch in the Isle of Man. Deposits held with the
Isle of Man branch are covered by the Isle of Man Depositors’ Compensation
Scheme as set out in the Isle of Man Depositors’ Compensation Scheme Regulations
2010. In the Isle of Man, Santander UK plc’s principal place of business is at
19/21 Prospect Hill, Douglas, Isle of Man, IM1 1ET. Santander and the flame logo
are registered trademarks.
Santander Asset Finance plc. Reg. No. 1533123. Registered Office: 2 Triton
Square, Regent’s Place, London NW1 3AN. Registered in England. Santander
Corporate & Commercial is a brand name used by Santander UK plc, Abbey National
Treasury Services plc and Santander Asset Finance plc.
Ref:[PDB#1-4A]


Re: Using JDBC clients with "Spark on Hive"

2016-01-15 Thread Daniel Darabos
Does Hive JDBC work if you are not using Spark as a backend? I just had
very bad experience with Hive JDBC in general. E.g. half the JDBC protocol
is not implemented (https://issues.apache.org/jira/browse/HIVE-3175, filed
in 2012).

On Fri, Jan 15, 2016 at 2:15 AM, sdevashis  wrote:

> Hello Experts,
>
> I am getting started with Hive with Spark as the query engine. I built the
> package from sources. I am able to invoke Hive CLI and run queries and see
> in Ambari that Spark application are being created confirming hive is using
> Spark as the engine.
>
> However other than Hive CLI, I am not able to run queries from any other
> clients that use the JDBC to connect to hive through thrift. I tried
> Squirrel, Aginity Netezza workbench, and even Hue.
>
> No yarn applications are getting created, the query times out after
> sometime. Nothing gets into /tmp/user/hive.log Am I missing something?
>
> Again I am using Hive on Spark and not spark SQL.
>
> Version Info:
> Spark 1.4.1 built for Hadoop 2.4
>
>
> Thank you in advance for any pointers.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Using-JDBC-clients-with-Spark-on-Hive-tp25976.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


spark source Intellij

2016-01-15 Thread Sanjeev Verma
I want to configure spark source code into Intellij IDEA Is there any
document available / known steps which can guide me to configure spark
project in to the Intellij IDEA.

Any help will be appreciated
Thanks


Re: DataFrame partitionBy to a single Parquet file (per partition)

2016-01-15 Thread Arkadiusz Bicz
Why do you need to be only one file? Spark doing good job writing in
many files.

On Fri, Jan 15, 2016 at 7:48 AM, Patrick McGloin
 wrote:
> Hi,
>
> I would like to reparation / coalesce my data so that it is saved into one
> Parquet file per partition. I would also like to use the Spark SQL
> partitionBy API. So I could do that like this:
>
> df.coalesce(1).write.partitionBy("entity", "year", "month", "day",
> "status").mode(SaveMode.Append).parquet(s"$location")
>
> I've tested this and it doesn't seem to perform well. This is because there
> is only one partition to work on in the dataset and all the partitioning,
> compression and saving of files has to be done by one CPU core.
>
> I could rewrite this to do the partitioning manually (using filter with the
> distinct partition values for example) before calling coalesce.
>
> But is there a better way to do this using the standard Spark SQL API?
>
> Best regards,
>
> Patrick
>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



sqlContext.cacheTable("tableName") vs dataFrame.cache()

2016-01-15 Thread George Sigletos
According to the documentation they are exactly the same, but in my queries

dataFrame.cache()

results in much faster execution times vs doing

sqlContext.cacheTable("tableName")

Is there any explanation about this? I am not caching the RDD prior to
creating the dataframe. Using Pyspark on Spark 1.5.2

Kind regards,
George


Re: Spark App -Yarn-Cluster-Mode ===> Hadoop_conf_**.zip file.

2016-01-15 Thread Ted Yu
bq. check application tracking
page:http://slave1:8088/proxy/application_1452763526769_0011/
Then , ...

Have you done the above to see what error was in each attempt ?

Which Spark / hadoop release are you using ?

Thanks

On Fri, Jan 15, 2016 at 5:58 AM, Siddharth Ubale <
siddharth.ub...@syncoms.com> wrote:

> Hi,
>
>
>
> I am trying to run a Spark streaming application in yarn-cluster mode.
> However I am facing an issue where the job ends asking for a particular
> Hadoop_conf_**.zip file in hdfs location.
>
> Can any one guide with this?
>
> The application works fine in local mode only it stops abruptly for want
> of memory.
>
>
>
> Below is the error stack trace:
>
>
>
> diagnostics: Application application_1452763526769_0011 failed 2 times due
> to AM Container for appattempt_1452763526769_0011_02 exited with
> exitCode: -1000
>
> For more detailed output, check application tracking page:
> http://slave1:8088/proxy/application_1452763526769_0011/Then, click on
> links to logs of each attempt.
>
> Diagnostics: File does not exist:
> hdfs://slave1:9000/user/hduser/.sparkStaging/application_1452763526769_0011/__hadoop_conf__1057113228186399290.zip
>
> *java.io.FileNotFoundException: File does not exist:
> hdfs://slave1:9000/user/hduser/.sparkStaging/application_1452763526769_0011/__hadoop_conf__1057113228186399290.zip*
>
> at
> org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:1122)
>
> at
> org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:1114)
>
> at
> org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
>
> at
> org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1114)
>
> at
> org.apache.hadoop.yarn.util.FSDownload.copy(FSDownload.java:251)
>
> at
> org.apache.hadoop.yarn.util.FSDownload.access$000(FSDownload.java:61)
>
> at
> org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:359)
>
> at
> org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:357)
>
> at java.security.AccessController.doPrivileged(Native
> Method)
>
> at javax.security.auth.Subject.doAs(Subject.java:422)
>
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628)
>
> at
> org.apache.hadoop.yarn.util.FSDownload.call(FSDownload.java:356)
>
> at
> org.apache.hadoop.yarn.util.FSDownload.call(FSDownload.java:60)
>
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>
> at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>
> at java.lang.Thread.run(Thread.java:745)
>
>
>
> Failing this attempt. Failing the application.
>
> ApplicationMaster host: N/A
>
> ApplicationMaster RPC port: -1
>
> queue: default
>
> start time: 1452866026622
>
> final status: FAILED
>
> tracking URL:
> http://slave1:8088/cluster/app/application_1452763526769_0011
>
> user: hduser
>
> Exception in thread "main" org.apache.spark.SparkException: Application
> application_1452763526769_0011 finished with failed status
>
> at
> org.apache.spark.deploy.yarn.Client.run(Client.scala:841)
>
> at
> org.apache.spark.deploy.yarn.Client$.main(Client.scala:867)
>
> at org.apache.spark.deploy.yarn.Client.main(Client.scala)
>
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native
> Method)
>
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>
> at java.lang.reflect.Method.invoke(Method.java:497)
>
> at
> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:664)
>
> at
> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:169)
>
> at
> org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:192)
>
> at
> org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:111)
>
> at
> org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>
> 16/01/15 19:23:53 INFO Utils: Shutdown hook called
>
> 16/01/15 19:23:53 INFO Utils: Deleting directory
> 

jobs much slower in cluster mode vs local

2016-01-15 Thread Saif.A.Ellafi
Hello,

In general, I am usually able to run spark submit jobs in local mode, in a 
32-cores node with plenty of memory ram. The performance is significantly 
faster in local mode than when using a cluster of spark workers.

How can this be explained and what measures can one take in order to improve 
such performance?
Usually a job that takes 35 seconds in local mode takes around 48 seconds in a 
small cluster.

Thanks,
Saif



Re: jobs much slower in cluster mode vs local

2016-01-15 Thread Jiří Syrový
Hi,

you can try to use spark job server and submit jobs to it. The thing is
that the most expensive part is context creation.

J.

2016-01-15 15:28 GMT+01:00 :

> Hello,
>
> In general, I am usually able to run spark submit jobs in local mode, in a
> 32-cores node with plenty of memory ram. The performance is significantly
> faster in local mode than when using a cluster of spark workers.
>
> How can this be explained and what measures can one take in order to
> improve such performance?
> Usually a job that takes 35 seconds in local mode takes around 48 seconds
> in a small cluster.
>
> Thanks,
> Saif
>
>


Spark App -Yarn-Cluster-Mode ===> Hadoop_conf_**.zip file.

2016-01-15 Thread Siddharth Ubale
Hi,

I am trying to run a Spark streaming application in yarn-cluster mode. However 
I am facing an issue where the job ends asking for a particular 
Hadoop_conf_**.zip file in hdfs location.
Can any one guide with this?
The application works fine in local mode only it stops abruptly for want of 
memory.

Below is the error stack trace:

diagnostics: Application application_1452763526769_0011 failed 2 times due to 
AM Container for appattempt_1452763526769_0011_02 exited with  exitCode: 
-1000
For more detailed output, check application tracking 
page:http://slave1:8088/proxy/application_1452763526769_0011/Then, click on 
links to logs of each attempt.
Diagnostics: File does not exist: 
hdfs://slave1:9000/user/hduser/.sparkStaging/application_1452763526769_0011/__hadoop_conf__1057113228186399290.zip
java.io.FileNotFoundException: File does not exist: 
hdfs://slave1:9000/user/hduser/.sparkStaging/application_1452763526769_0011/__hadoop_conf__1057113228186399290.zip
at 
org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:1122)
at 
org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:1114)
at 
org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at 
org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1114)
at 
org.apache.hadoop.yarn.util.FSDownload.copy(FSDownload.java:251)
at 
org.apache.hadoop.yarn.util.FSDownload.access$000(FSDownload.java:61)
at 
org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:359)
at 
org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:357)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628)
at 
org.apache.hadoop.yarn.util.FSDownload.call(FSDownload.java:356)
at 
org.apache.hadoop.yarn.util.FSDownload.call(FSDownload.java:60)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

Failing this attempt. Failing the application.
ApplicationMaster host: N/A
ApplicationMaster RPC port: -1
queue: default
start time: 1452866026622
final status: FAILED
tracking URL: 
http://slave1:8088/cluster/app/application_1452763526769_0011
user: hduser
Exception in thread "main" org.apache.spark.SparkException: Application 
application_1452763526769_0011 finished with failed status
at org.apache.spark.deploy.yarn.Client.run(Client.scala:841)
at org.apache.spark.deploy.yarn.Client$.main(Client.scala:867)
at org.apache.spark.deploy.yarn.Client.main(Client.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at 
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:664)
at 
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:169)
at 
org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:192)
at 
org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:111)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
16/01/15 19:23:53 INFO Utils: Shutdown hook called
16/01/15 19:23:53 INFO Utils: Deleting directory 
/tmp/spark-b6ebcb83-efff-432a-9a7a-b4764f482d81
java.lang.UNIXProcess$ProcessPipeOutputStream@7a0a6f73  1



Siddharth Ubale,
Synchronized Communications
#43, Velankani Tech Park, Block No. II,
3rd Floor, Electronic City Phase I,
Bangalore – 560 100
Tel : +91 80 3202 4060
Web: www.syncoms.com
[LogoNEWmohLARGE]
London|Bangalore|Orlando

we innovate, plan, execute, and transform the business​



Stacking transformations and using intermediate results in the next transformation

2016-01-15 Thread Richard Siebeling
Hi,

we're stacking multiple RDD operations on each other, for example as a
source we have a RDD[List[String]] like

["a", "b, c", "d"]
["a", "d, a", "d"]

In the first step we split the second column in two columns, in the next
step we filter the data on column 3 = "c" and in the final step we're doing
something else. The point is that it needs to be flexible (the user adds
custom transformations and they are stacked on top of each other like the
example above, which transformations are added by the user is therefor not
known upfront).

The transformations itself are no problems but we want to keep track of the
added columns, the dropped columns and the updated columns. In the example
above, the second column is dropped and two new columns are added.

The intermediate result here will be

["a", "b", "c", "d"]
["a", "d", "a", "d"]

And the final result will be

["a", "b", "c", "d"]

What I would like to know is after each transformation which columns are
added, which colunms are dropped and which ones are updated.This is
information that's needed to execute the next transformation.

I was thinking of two possible scenario's:

1. capture the metadata and store that in the RDD, effectively creating a
RDD[List[String], List[Column], List[Column], List[Column]) object. Where
the last three List[Column] contain the new, dropped or updated columns.
This will result in an RDD with a lot of extra information on each row.
That information is not needed on each row but rather one time for the
whole split transformation

2. use accumulators to store the new, updated and dropped columns. But I
don't think this is feasible

Are there any better scenario's or how could I accomplish such a scenario?

thanks in advance,
Richard


Re: Serialization stack error

2016-01-15 Thread Ted Yu
Here is signature for Get:

public class Get extends Query

  implements Row, Comparable {

It is not Serializable.


FYI

On Fri, Jan 15, 2016 at 6:37 AM, beeshma r  wrote:

> HI i am trying to get data from Solr server .
>
> This is my code
>
> /*input is JavaRDD li
> *output is  JavaRDD for scanning Hbase*/
>
>
> public static JavaRDD getdocs(JavaRDD li)
> {
>
> JavaRDD newdocs=li;
>
> JavaRDD n=newdocs.map(new Function(){
>
> public Get call(SolrDocumentList si) throws IOException
> {
> Get get = null;
>
> for (SolrDocument doc : si) {
> get = new Get(Bytes.toBytes(((String)
> doc.getFieldValue("id";
>
> }
>
> return get;
>
> }
>
>
> }
>
>
> issue am getting below error
>
> 16/01/15 06:05:55 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID
> 0, localhost, PROCESS_LOCAL, 2815 bytes)
> 16/01/15 06:05:55 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
> 16/01/15 06:05:55 INFO HttpClientUtil: Creating new http client,
> config:maxConnections=128=32=false
> 16/01/15 06:05:56 INFO HttpClientUtil: Creating new http client,
> config:maxConnections=128=32=false
> 16/01/15 06:05:56 INFO HttpClientUtil: Creating new http client,
> config:maxConnections=128=32=false
> 16/01/15 06:05:56 INFO HttpClientUtil: Creating new http client,
> config:maxConnections=128=32=false
> 16/01/15 06:05:56 INFO HttpClientUtil: Creating new http client,
> config:maxConnections=128=32=false
> 16/01/15 06:05:56 INFO HttpClientUtil: Creating new http client,
> config:maxConnections=128=32=false
> 16/01/15 06:05:57 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID
> 0)
>
>
>
> *java.io.NotSerializableException:
> org.apache.hadoop.hbase.client.GetSerialization stack:- object not
> serializable (class: org.apache.hadoop.hbase.client.Get, value:
> {"timeRange":[0,9223372036854775807],"totalColumns":0,"cacheBlocks":true,"families":{},"maxVersions":1,"row":"row4"})
> - element of array (index: 0)*
> - array (class [Ljava.lang.Object;, size 6)
> at
> org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
> at
> org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
> at
> org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:84)
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:240)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:724)
> 16/01/15 06:05:57 ERROR TaskSetManager: Task 0.0 in stage 0.0 (TID 0) had
> a not serializable result: org.apache.hadoop.hbase.client.Get
> Serialization stack:
> - object not serializable (class: org.apache.hadoop.hbase.client.Get,
> value:
> {"timeRange":[0,9223372036854775807],"totalColumns":0,"cacheBlocks":true,"families":{},"maxVersions":1,"row":"row4"})
> - element of array (index: 0)
> - array (class [Ljava.lang.Object;, size 6); not retrying
> 16/01/15 06:05:57 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks
> have all completed, from pool
> 16/01/15 06:05:57 INFO TaskSchedulerImpl: Cancelling stage 0
> 16/01/15 06:05:57 INFO DAGScheduler: ResultStage 0 (collect at
> App.java:278) failed in 2.481 s
> 16/01/15 06:05:57 INFO DAGScheduler: Job 0 failed: collect at
> App.java:278, took 3.378240 s
> [WARNING]
> java.lang.reflect.InvocationTargetException
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:293)
> at java.lang.Thread.run(Thread.java:724)
> Caused by: org.apache.spark.SparkException: Job aborted due to stage
> failure: Task 0.0 in stage 0.0 (TID 0) had a not serializable result:
> org.apache.hadoop.hbase.client.Get
> Serialization stack:
> - object not serializable (class: org.apache.hadoop.hbase.client.Get,
> value:
> {"timeRange":[0,9223372036854775807],"totalColumns":0,"cacheBlocks":true,"families":{},"maxVersions":1,"row":"row4"})
> - element of array (index: 0)
> - array (class [Ljava.lang.Object;, size 6)
> at org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1283)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1271)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1270)
> at
> 

RE: jobs much slower in cluster mode vs local

2016-01-15 Thread Spencer, Alex (Santander)
That's not that much of a difference given the overhead of cluster management. 
I would have thought a job should take minutes before you'll see a performance 
improvement on using cluster mode?

Kind Regards,
Alex.

From: saif.a.ell...@wellsfargo.com [mailto:saif.a.ell...@wellsfargo.com]
Sent: 15 January 2016 14:29
To: user@spark.apache.org
Subject: jobs much slower in cluster mode vs local

Hello,

In general, I am usually able to run spark submit jobs in local mode, in a 
32-cores node with plenty of memory ram. The performance is significantly 
faster in local mode than when using a cluster of spark workers.

How can this be explained and what measures can one take in order to improve 
such performance?
Usually a job that takes 35 seconds in local mode takes around 48 seconds in a 
small cluster.

Thanks,
Saif

Emails aren't always secure, and they may be intercepted or changed after
they've been sent. Santander doesn't accept liability if this happens. If you
think someone may have interfered with this email, please get in touch with the
sender another way. This message doesn't create or change any contract.
Santander doesn't accept responsibility for damage caused by any viruses
contained in this email or its attachments. Emails may be monitored. If you've
received this email by mistake, please let the sender know at once that it's
gone to the wrong person and then destroy it without copying, using, or telling
anyone about its contents.
Santander UK plc Reg. No. 2294747 and Abbey National Treasury Services plc Reg.
No. 2338548 Registered Offices: 2 Triton Square, Regent's Place, London NW1 3AN.
Registered in England. www.santander.co.uk. Authorised by the Prudential
Regulation Authority and regulated by the Financial Conduct Authority and the
Prudential Regulation Authority. FCA Reg. No. 106054 and 146003 respectively.
Santander Sharedealing is a trading name of Abbey Stockbrokers Limited Reg. No.
02666793. Registered Office: Kingfisher House, Radford Way, Billericay, Essex
CM12 0GZ. Authorised and regulated by the Financial Conduct Authority. FCA Reg.
No. 154210. You can check this on the Financial Services Register by visiting
the FCA’s website www.fca.org.uk/register or by contacting the FCA on 0800 111
6768. Santander UK plc is also licensed by the Financial Supervision Commission
of the Isle of Man for its branch in the Isle of Man. Deposits held with the
Isle of Man branch are covered by the Isle of Man Depositors’ Compensation
Scheme as set out in the Isle of Man Depositors’ Compensation Scheme Regulations
2010. In the Isle of Man, Santander UK plc’s principal place of business is at
19/21 Prospect Hill, Douglas, Isle of Man, IM1 1ET. Santander and the flame logo
are registered trademarks.
Santander Asset Finance plc. Reg. No. 1533123. Registered Office: 2 Triton
Square, Regent’s Place, London NW1 3AN. Registered in England. Santander
Corporate & Commercial is a brand name used by Santander UK plc, Abbey National
Treasury Services plc and Santander Asset Finance plc.
Ref:[PDB#1-4A]
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

simultaneous actions

2016-01-15 Thread Kira
Hi,

Can we run *simultaneous* actions on the *same RDD* ?; if yes how can this
be done ?

Thank you,
Regards



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/simultaneous-actions-tp25977.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Serialization stack error

2016-01-15 Thread beeshma r
HI i am trying to get data from Solr server .

This is my code

/*input is JavaRDD li
*output is  JavaRDD for scanning Hbase*/


public static JavaRDD getdocs(JavaRDD li)
{

JavaRDD newdocs=li;

JavaRDD n=newdocs.map(new Function(){

public Get call(SolrDocumentList si) throws IOException
{
Get get = null;

for (SolrDocument doc : si) {
get = new Get(Bytes.toBytes(((String)
doc.getFieldValue("id";

}

return get;

}


}


issue am getting below error

16/01/15 06:05:55 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID
0, localhost, PROCESS_LOCAL, 2815 bytes)
16/01/15 06:05:55 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
16/01/15 06:05:55 INFO HttpClientUtil: Creating new http client,
config:maxConnections=128=32=false
16/01/15 06:05:56 INFO HttpClientUtil: Creating new http client,
config:maxConnections=128=32=false
16/01/15 06:05:56 INFO HttpClientUtil: Creating new http client,
config:maxConnections=128=32=false
16/01/15 06:05:56 INFO HttpClientUtil: Creating new http client,
config:maxConnections=128=32=false
16/01/15 06:05:56 INFO HttpClientUtil: Creating new http client,
config:maxConnections=128=32=false
16/01/15 06:05:56 INFO HttpClientUtil: Creating new http client,
config:maxConnections=128=32=false
16/01/15 06:05:57 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)



*java.io.NotSerializableException:
org.apache.hadoop.hbase.client.GetSerialization stack:- object not
serializable (class: org.apache.hadoop.hbase.client.Get, value:
{"timeRange":[0,9223372036854775807],"totalColumns":0,"cacheBlocks":true,"families":{},"maxVersions":1,"row":"row4"})
- element of array (index: 0)*
- array (class [Ljava.lang.Object;, size 6)
at
org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
at
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:84)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:240)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:724)
16/01/15 06:05:57 ERROR TaskSetManager: Task 0.0 in stage 0.0 (TID 0) had a
not serializable result: org.apache.hadoop.hbase.client.Get
Serialization stack:
- object not serializable (class: org.apache.hadoop.hbase.client.Get,
value:
{"timeRange":[0,9223372036854775807],"totalColumns":0,"cacheBlocks":true,"families":{},"maxVersions":1,"row":"row4"})
- element of array (index: 0)
- array (class [Ljava.lang.Object;, size 6); not retrying
16/01/15 06:05:57 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks
have all completed, from pool
16/01/15 06:05:57 INFO TaskSchedulerImpl: Cancelling stage 0
16/01/15 06:05:57 INFO DAGScheduler: ResultStage 0 (collect at
App.java:278) failed in 2.481 s
16/01/15 06:05:57 INFO DAGScheduler: Job 0 failed: collect at App.java:278,
took 3.378240 s
[WARNING]
java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:293)
at java.lang.Thread.run(Thread.java:724)
Caused by: org.apache.spark.SparkException: Job aborted due to stage
failure: Task 0.0 in stage 0.0 (TID 0) had a not serializable result:
org.apache.hadoop.hbase.client.Get
Serialization stack:
- object not serializable (class: org.apache.hadoop.hbase.client.Get,
value:
{"timeRange":[0,9223372036854775807],"totalColumns":0,"cacheBlocks":true,"families":{},"maxVersions":1,"row":"row4"})
- element of array (index: 0)
- array (class [Ljava.lang.Object;, size 6)
at org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1283)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1271)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1270)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1270)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
at

Re: Spark App -Yarn-Cluster-Mode ===> Hadoop_conf_**.zip file.

2016-01-15 Thread Ted Yu
Interesting. Which hbase / Phoenix releases are you using ?
The following method has been removed from Put:

   public Put setWriteToWAL(boolean write) {

Please make sure the Phoenix release is compatible with your hbase version.

Cheers

On Fri, Jan 15, 2016 at 6:20 AM, Siddharth Ubale <
siddharth.ub...@syncoms.com> wrote:

> Hi,
>
>
>
>
>
> This is the log from the application :
>
>
>
> 16/01/15 19:23:19 INFO yarn.ApplicationMaster: Unregistering
> ApplicationMaster with SUCCEEDED (diag message: Shutdown hook called before
> final status was reported.)
>
> 16/01/15 19:23:19 INFO yarn.ApplicationMaster: Deleting staging directory
> .sparkStaging/application_1452763526769_0011
>
> 16/01/15 19:23:19 INFO remote.RemoteActorRefProvider$RemotingTerminator:
> Shutting down remote daemon.
>
> 16/01/15 19:23:19 INFO remote.RemoteActorRefProvider$RemotingTerminator:
> Remote daemon shut down; proceeding with flushing remote transports.
>
> 16/01/15 19:23:19 INFO remote.RemoteActorRefProvider$RemotingTerminator:
> Remoting shut down.
>
> 16/01/15 19:23:19 INFO util.Utils: Shutdown hook called
>
> 16/01/15 19:23:19 INFO
> client.HConnectionManager$HConnectionImplementation: Closing zookeeper
> sessionid=0x1523f753f6f0061
>
> 16/01/15 19:23:19 INFO zookeeper.ClientCnxn: EventThread shut down
>
> 16/01/15 19:23:19 INFO zookeeper.ZooKeeper: Session: 0x1523f753f6f0061
> closed
>
> 16/01/15 19:23:19 ERROR yarn.ApplicationMaster: User class threw
> exception: java.lang.NoSuchMethodError:
> org.apache.hadoop.hbase.client.Put.setWriteToWAL(Z)Lorg/apache/hadoop/hbase/client/Put;
>
> java.lang.NoSuchMethodError:
> org.apache.hadoop.hbase.client.Put.setWriteToWAL(Z)Lorg/apache/hadoop/hbase/client/Put;
>
> at
> org.apache.phoenix.schema.PTableImpl$PRowImpl.newMutations(PTableImpl.java:639)
>
> at
> org.apache.phoenix.schema.PTableImpl$PRowImpl.(PTableImpl.java:632)
>
> at
> org.apache.phoenix.schema.PTableImpl.newRow(PTableImpl.java:557)
>
> at
> org.apache.phoenix.schema.PTableImpl.newRow(PTableImpl.java:573)
>
> at
> org.apache.phoenix.execute.MutationState.addRowMutations(MutationState.java:185)
>
> at
> org.apache.phoenix.execute.MutationState.access$200(MutationState.java:79)
>
> at
> org.apache.phoenix.execute.MutationState$2.init(MutationState.java:258)
>
> at
> org.apache.phoenix.execute.MutationState$2.(MutationState.java:255)
>
> at
> org.apache.phoenix.execute.MutationState.toMutations(MutationState.java:253)
>
> at
> org.apache.phoenix.execute.MutationState.toMutations(MutationState.java:243)
>
> at
> org.apache.phoenix.schema.MetaDataClient.createTableInternal(MetaDataClient.java:1840)
>
> at
> org.apache.phoenix.schema.MetaDataClient.createTable(MetaDataClient.java:744)
>
> at
> org.apache.phoenix.compile.CreateTableCompiler$2.execute(CreateTableCompiler.java:186)
>
> at
> org.apache.phoenix.jdbc.PhoenixStatement$2.call(PhoenixStatement.java:303)
>
> at
> org.apache.phoenix.jdbc.PhoenixStatement$2.call(PhoenixStatement.java:295)
>
> at
> org.apache.phoenix.call.CallRunner.run(CallRunner.java:53)
>
> at
> org.apache.phoenix.jdbc.PhoenixStatement.executeMutation(PhoenixStatement.java:293)
>
> at
> org.apache.phoenix.jdbc.PhoenixStatement.executeUpdate(PhoenixStatement.java:1236)
>
> at
> org.apache.phoenix.query.ConnectionQueryServicesImpl$12.call(ConnectionQueryServicesImpl.java:1891)
>
> at
> org.apache.phoenix.query.ConnectionQueryServicesImpl$12.call(ConnectionQueryServicesImpl.java:1860)
>
> at
> org.apache.phoenix.util.PhoenixContextExecutor.call(PhoenixContextExecutor.java:77)
>
> at
> org.apache.phoenix.query.ConnectionQueryServicesImpl.init(ConnectionQueryServicesImpl.java:1860)
>
> at
> org.apache.phoenix.jdbc.PhoenixDriver.getConnectionQueryServices(PhoenixDriver.java:162)
>
> at
> org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.connect(PhoenixEmbeddedDriver.java:131)
>
> at
> org.apache.phoenix.jdbc.PhoenixDriver.connect(PhoenixDriver.java:133)
>
> at
> java.sql.DriverManager.getConnection(DriverManager.java:664)
>
> at
> java.sql.DriverManager.getConnection(DriverManager.java:270)
>
> at
> spark.phoenix.PhoenixConnect.getConnection(PhoenixConnect.java:26)
>
> at
> spark.stream.eventStream.startStream(eventStream.java:105)
>
> at
> time.series.wo.agg.InputStreamSpark.main(InputStreamSpark.java:38)
>
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native
> Method)
>
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>
> at
> 

Re: Serialization stack error

2016-01-15 Thread Ted Yu
Can you encapsulate your map function such that it returns data type other
than Get ?

You can perform query to hbase but don't return Get.

Cheers

On Fri, Jan 15, 2016 at 6:46 AM, beeshma r  wrote:

> Hi Ted ,
>
> Any suggestions for changing this piece of code?
>
> public static JavaRDD getdocs(JavaRDD<
> SolrDocumentList> li)
> {
>
> JavaRDD newdocs=li;
>
> JavaRDD n=newdocs.map(new Function(){
>
> public Get call(SolrDocumentList si) throws IOException
> {
> Get get = null;
>
> for (SolrDocument doc : si) {
> get = new Get(Bytes.toBytes(((String)
> doc.getFieldValue("id";
>
> }
>
> return get;
>
> }
>
>
> }
>
>
>
> On Fri, Jan 15, 2016 at 6:40 AM, Ted Yu  wrote:
>
>> Here is signature for Get:
>>
>> public class Get extends Query
>>
>>   implements Row, Comparable {
>>
>> It is not Serializable.
>>
>>
>> FYI
>>
>> On Fri, Jan 15, 2016 at 6:37 AM, beeshma r  wrote:
>>
>>> HI i am trying to get data from Solr server .
>>>
>>> This is my code
>>>
>>> /*input is JavaRDD li
>>> *output is  JavaRDD for scanning Hbase*/
>>>
>>>
>>> public static JavaRDD getdocs(JavaRDD li)
>>> {
>>>
>>> JavaRDD newdocs=li;
>>>
>>> JavaRDD n=newdocs.map(new Function(){
>>>
>>> public Get call(SolrDocumentList si) throws IOException
>>> {
>>> Get get = null;
>>>
>>> for (SolrDocument doc : si) {
>>> get = new Get(Bytes.toBytes(((String)
>>> doc.getFieldValue("id";
>>>
>>> }
>>>
>>> return get;
>>>
>>> }
>>>
>>>
>>> }
>>>
>>>
>>> issue am getting below error
>>>
>>> 16/01/15 06:05:55 INFO TaskSetManager: Starting task 0.0 in stage 0.0
>>> (TID 0, localhost, PROCESS_LOCAL, 2815 bytes)
>>> 16/01/15 06:05:55 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
>>> 16/01/15 06:05:55 INFO HttpClientUtil: Creating new http client,
>>> config:maxConnections=128=32=false
>>> 16/01/15 06:05:56 INFO HttpClientUtil: Creating new http client,
>>> config:maxConnections=128=32=false
>>> 16/01/15 06:05:56 INFO HttpClientUtil: Creating new http client,
>>> config:maxConnections=128=32=false
>>> 16/01/15 06:05:56 INFO HttpClientUtil: Creating new http client,
>>> config:maxConnections=128=32=false
>>> 16/01/15 06:05:56 INFO HttpClientUtil: Creating new http client,
>>> config:maxConnections=128=32=false
>>> 16/01/15 06:05:56 INFO HttpClientUtil: Creating new http client,
>>> config:maxConnections=128=32=false
>>> 16/01/15 06:05:57 ERROR Executor: Exception in task 0.0 in stage 0.0
>>> (TID 0)
>>>
>>>
>>>
>>> *java.io.NotSerializableException:
>>> org.apache.hadoop.hbase.client.GetSerialization stack:- object not
>>> serializable (class: org.apache.hadoop.hbase.client.Get, value:
>>> {"timeRange":[0,9223372036854775807],"totalColumns":0,"cacheBlocks":true,"families":{},"maxVersions":1,"row":"row4"})
>>> - element of array (index: 0)*
>>> - array (class [Ljava.lang.Object;, size 6)
>>> at
>>> org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
>>> at
>>> org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
>>> at
>>> org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:84)
>>> at
>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:240)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>> at java.lang.Thread.run(Thread.java:724)
>>> 16/01/15 06:05:57 ERROR TaskSetManager: Task 0.0 in stage 0.0 (TID 0)
>>> had a not serializable result: org.apache.hadoop.hbase.client.Get
>>> Serialization stack:
>>> - object not serializable (class:
>>> org.apache.hadoop.hbase.client.Get, value:
>>> {"timeRange":[0,9223372036854775807],"totalColumns":0,"cacheBlocks":true,"families":{},"maxVersions":1,"row":"row4"})
>>> - element of array (index: 0)
>>> - array (class [Ljava.lang.Object;, size 6); not retrying
>>> 16/01/15 06:05:57 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose
>>> tasks have all completed, from pool
>>> 16/01/15 06:05:57 INFO TaskSchedulerImpl: Cancelling stage 0
>>> 16/01/15 06:05:57 INFO DAGScheduler: ResultStage 0 (collect at
>>> App.java:278) failed in 2.481 s
>>> 16/01/15 06:05:57 INFO DAGScheduler: Job 0 failed: collect at
>>> App.java:278, took 3.378240 s
>>> [WARNING]
>>> java.lang.reflect.InvocationTargetException
>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>> at
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>>> at
>>> 

Re: simultaneous actions

2016-01-15 Thread Jonathan Coveney
Threads

El viernes, 15 de enero de 2016, Kira  escribió:

> Hi,
>
> Can we run *simultaneous* actions on the *same RDD* ?; if yes how can this
> be done ?
>
> Thank you,
> Regards
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/simultaneous-actions-tp25977.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
> For additional commands, e-mail: user-h...@spark.apache.org 
>
>


RE: NPE when using Joda DateTime

2016-01-15 Thread Spencer, Alex (Santander)
OK, this isn’t very efficient, but I’ve found a solution:

I am still passing around joda DateTime’s – however if I ever want to use any 
of the minusHours/minusDays etc “transient” variables, I simply do this:

var tempDate = new org.joda.time.DateTime(transaction.date.toInstant)

then:

tempDate.minusHours(1) works fine.

Kind Regards,
Alex.

From: Romain Sagean [mailto:romain.sag...@hupi.fr]
Sent: 15 January 2016 13:08
To: Sean Owen
Cc: Spencer, Alex (Santander); Shixiong(Ryan) Zhu; user@spark.apache.org
Subject: Re: NPE when using Joda DateTime

Hi, I had a similar problem with Joda Time though i didn't use Kryo, the 
solution I found was to use standard java date and time classes instead of Joda.

2016-01-15 13:16 GMT+01:00 Sean Owen 
>:
I haven't dug into this, but I agree that something that is transient
isn't meant to be restored by the default Java serialization
mechanism. I'd expect the class handles restoring that value as needed
or in a custom readObject method. And then I don't know how Kryo
interacts with that.

I don't think you need to install anything. You may end up writing
your own serialization for Kryo.

Try not using Kryo just to narrow it down?

Hackier solution: send around long timestamps and then make them into
DateTime locally as needed. Not great.

Or if possible use Java 8, where Joda APIs are part of the JDK.
Possibly it works then.

On Fri, Jan 15, 2016 at 9:32 AM, Spencer, Alex (Santander)
> wrote:
> Hi,
>
>
>
> I tried Zhu’s recommendation and sadly got the same error. (Again, single
> map worked by the groupBy / flatMap generates this error).
>
>
>
> Does Kryo has a bug i.e. it’s not serialising all components needed, or do
> I just need to get our IT team to install those magro Serializers as
> suggested by Todd? If that variable is transient then actually that means
> Kryo is working as it’s meant to?
>
>
>
> Am I at the point where I should pull apart the source code and build my own
> DateTime class? I hate reinventing the wheel though.
>
>
>
> Thanks,
>
> Alex.
>
>
-
To unsubscribe, e-mail: 
user-unsubscr...@spark.apache.org
For additional commands, e-mail: 
user-h...@spark.apache.org



--
Romain Sagean

romain.sag...@hupi.fr
[cid:image001.png@01D14F9A.B07105D0]
Emails aren't always secure, and they may be intercepted or changed after
they've been sent. Santander doesn't accept liability if this happens. If you
think someone may have interfered with this email, please get in touch with the
sender another way. This message doesn't create or change any contract.
Santander doesn't accept responsibility for damage caused by any viruses
contained in this email or its attachments. Emails may be monitored. If you've
received this email by mistake, please let the sender know at once that it's
gone to the wrong person and then destroy it without copying, using, or telling
anyone about its contents.
Santander UK plc Reg. No. 2294747 and Abbey National Treasury Services plc Reg.
No. 2338548 Registered Offices: 2 Triton Square, Regent's Place, London NW1 3AN.
Registered in England. www.santander.co.uk. Authorised by the Prudential
Regulation Authority and regulated by the Financial Conduct Authority and the
Prudential Regulation Authority. FCA Reg. No. 106054 and 146003 respectively.
Santander Sharedealing is a trading name of Abbey Stockbrokers Limited Reg. No.
02666793. Registered Office: Kingfisher House, Radford Way, Billericay, Essex
CM12 0GZ. Authorised and regulated by the Financial Conduct Authority. FCA Reg.
No. 154210. You can check this on the Financial Services Register by visiting
the FCA’s website www.fca.org.uk/register or by contacting the FCA on 0800 111
6768. Santander UK plc is also licensed by the Financial Supervision Commission
of the Isle of Man for its branch in the Isle of Man. Deposits held with the
Isle of Man branch are covered by the Isle of Man Depositors’ Compensation
Scheme as set out in the Isle of Man Depositors’ Compensation Scheme Regulations
2010. In the Isle of Man, Santander UK plc’s principal place of business is at
19/21 Prospect Hill, Douglas, Isle of Man, IM1 1ET. Santander and the flame logo
are registered trademarks.
Santander Asset Finance plc. Reg. No. 1533123. Registered Office: 2 Triton
Square, Regent’s Place, London NW1 3AN. Registered in England. Santander
Corporate & Commercial is a brand name used by Santander UK plc, Abbey National
Treasury Services plc and Santander Asset Finance plc.
Ref:[PDB#1-4A]


RE: Spark App -Yarn-Cluster-Mode ===> Hadoop_conf_**.zip file.

2016-01-15 Thread Siddharth Ubale
Hi,


This is the log from the application :

16/01/15 19:23:19 INFO yarn.ApplicationMaster: Unregistering ApplicationMaster 
with SUCCEEDED (diag message: Shutdown hook called before final status was 
reported.)
16/01/15 19:23:19 INFO yarn.ApplicationMaster: Deleting staging directory 
.sparkStaging/application_1452763526769_0011
16/01/15 19:23:19 INFO remote.RemoteActorRefProvider$RemotingTerminator: 
Shutting down remote daemon.
16/01/15 19:23:19 INFO remote.RemoteActorRefProvider$RemotingTerminator: Remote 
daemon shut down; proceeding with flushing remote transports.
16/01/15 19:23:19 INFO remote.RemoteActorRefProvider$RemotingTerminator: 
Remoting shut down.
16/01/15 19:23:19 INFO util.Utils: Shutdown hook called
16/01/15 19:23:19 INFO client.HConnectionManager$HConnectionImplementation: 
Closing zookeeper sessionid=0x1523f753f6f0061
16/01/15 19:23:19 INFO zookeeper.ClientCnxn: EventThread shut down
16/01/15 19:23:19 INFO zookeeper.ZooKeeper: Session: 0x1523f753f6f0061 closed
16/01/15 19:23:19 ERROR yarn.ApplicationMaster: User class threw exception: 
java.lang.NoSuchMethodError: 
org.apache.hadoop.hbase.client.Put.setWriteToWAL(Z)Lorg/apache/hadoop/hbase/client/Put;
java.lang.NoSuchMethodError: 
org.apache.hadoop.hbase.client.Put.setWriteToWAL(Z)Lorg/apache/hadoop/hbase/client/Put;
at 
org.apache.phoenix.schema.PTableImpl$PRowImpl.newMutations(PTableImpl.java:639)
at 
org.apache.phoenix.schema.PTableImpl$PRowImpl.(PTableImpl.java:632)
at 
org.apache.phoenix.schema.PTableImpl.newRow(PTableImpl.java:557)
at 
org.apache.phoenix.schema.PTableImpl.newRow(PTableImpl.java:573)
at 
org.apache.phoenix.execute.MutationState.addRowMutations(MutationState.java:185)
at 
org.apache.phoenix.execute.MutationState.access$200(MutationState.java:79)
at 
org.apache.phoenix.execute.MutationState$2.init(MutationState.java:258)
at 
org.apache.phoenix.execute.MutationState$2.(MutationState.java:255)
at 
org.apache.phoenix.execute.MutationState.toMutations(MutationState.java:253)
at 
org.apache.phoenix.execute.MutationState.toMutations(MutationState.java:243)
at 
org.apache.phoenix.schema.MetaDataClient.createTableInternal(MetaDataClient.java:1840)
at 
org.apache.phoenix.schema.MetaDataClient.createTable(MetaDataClient.java:744)
at 
org.apache.phoenix.compile.CreateTableCompiler$2.execute(CreateTableCompiler.java:186)
at 
org.apache.phoenix.jdbc.PhoenixStatement$2.call(PhoenixStatement.java:303)
at 
org.apache.phoenix.jdbc.PhoenixStatement$2.call(PhoenixStatement.java:295)
at org.apache.phoenix.call.CallRunner.run(CallRunner.java:53)
at 
org.apache.phoenix.jdbc.PhoenixStatement.executeMutation(PhoenixStatement.java:293)
at 
org.apache.phoenix.jdbc.PhoenixStatement.executeUpdate(PhoenixStatement.java:1236)
at 
org.apache.phoenix.query.ConnectionQueryServicesImpl$12.call(ConnectionQueryServicesImpl.java:1891)
at 
org.apache.phoenix.query.ConnectionQueryServicesImpl$12.call(ConnectionQueryServicesImpl.java:1860)
at 
org.apache.phoenix.util.PhoenixContextExecutor.call(PhoenixContextExecutor.java:77)
at 
org.apache.phoenix.query.ConnectionQueryServicesImpl.init(ConnectionQueryServicesImpl.java:1860)
at 
org.apache.phoenix.jdbc.PhoenixDriver.getConnectionQueryServices(PhoenixDriver.java:162)
at 
org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.connect(PhoenixEmbeddedDriver.java:131)
at 
org.apache.phoenix.jdbc.PhoenixDriver.connect(PhoenixDriver.java:133)
at java.sql.DriverManager.getConnection(DriverManager.java:664)
at java.sql.DriverManager.getConnection(DriverManager.java:270)
at 
spark.phoenix.PhoenixConnect.getConnection(PhoenixConnect.java:26)
at spark.stream.eventStream.startStream(eventStream.java:105)
at 
time.series.wo.agg.InputStreamSpark.main(InputStreamSpark.java:38)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at 
org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:483)
Thanks,
Siddharth


From: Ted Yu [mailto:yuzhih...@gmail.com]
Sent: Friday, January 15, 2016 7:43 PM
To: Siddharth Ubale 
Cc: user@spark.apache.org
Subject: Re: Spark App -Yarn-Cluster-Mode ===> Hadoop_conf_**.zip file.

bq. check application tracking 

Re: Serialization stack error

2016-01-15 Thread beeshma r
Hi Ted ,

Any suggestions for changing this piece of code?

public static JavaRDD getdocs(JavaRDD<
SolrDocumentList> li)
{

JavaRDD newdocs=li;

JavaRDD n=newdocs.map(new Function(){

public Get call(SolrDocumentList si) throws IOException
{
Get get = null;

for (SolrDocument doc : si) {
get = new Get(Bytes.toBytes(((String)
doc.getFieldValue("id";

}

return get;

}


}



On Fri, Jan 15, 2016 at 6:40 AM, Ted Yu  wrote:

> Here is signature for Get:
>
> public class Get extends Query
>
>   implements Row, Comparable {
>
> It is not Serializable.
>
>
> FYI
>
> On Fri, Jan 15, 2016 at 6:37 AM, beeshma r  wrote:
>
>> HI i am trying to get data from Solr server .
>>
>> This is my code
>>
>> /*input is JavaRDD li
>> *output is  JavaRDD for scanning Hbase*/
>>
>>
>> public static JavaRDD getdocs(JavaRDD li)
>> {
>>
>> JavaRDD newdocs=li;
>>
>> JavaRDD n=newdocs.map(new Function(){
>>
>> public Get call(SolrDocumentList si) throws IOException
>> {
>> Get get = null;
>>
>> for (SolrDocument doc : si) {
>> get = new Get(Bytes.toBytes(((String)
>> doc.getFieldValue("id";
>>
>> }
>>
>> return get;
>>
>> }
>>
>>
>> }
>>
>>
>> issue am getting below error
>>
>> 16/01/15 06:05:55 INFO TaskSetManager: Starting task 0.0 in stage 0.0
>> (TID 0, localhost, PROCESS_LOCAL, 2815 bytes)
>> 16/01/15 06:05:55 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
>> 16/01/15 06:05:55 INFO HttpClientUtil: Creating new http client,
>> config:maxConnections=128=32=false
>> 16/01/15 06:05:56 INFO HttpClientUtil: Creating new http client,
>> config:maxConnections=128=32=false
>> 16/01/15 06:05:56 INFO HttpClientUtil: Creating new http client,
>> config:maxConnections=128=32=false
>> 16/01/15 06:05:56 INFO HttpClientUtil: Creating new http client,
>> config:maxConnections=128=32=false
>> 16/01/15 06:05:56 INFO HttpClientUtil: Creating new http client,
>> config:maxConnections=128=32=false
>> 16/01/15 06:05:56 INFO HttpClientUtil: Creating new http client,
>> config:maxConnections=128=32=false
>> 16/01/15 06:05:57 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID
>> 0)
>>
>>
>>
>> *java.io.NotSerializableException:
>> org.apache.hadoop.hbase.client.GetSerialization stack:- object not
>> serializable (class: org.apache.hadoop.hbase.client.Get, value:
>> {"timeRange":[0,9223372036854775807],"totalColumns":0,"cacheBlocks":true,"families":{},"maxVersions":1,"row":"row4"})
>> - element of array (index: 0)*
>> - array (class [Ljava.lang.Object;, size 6)
>> at
>> org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
>> at
>> org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
>> at
>> org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:84)
>> at
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:240)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>> at java.lang.Thread.run(Thread.java:724)
>> 16/01/15 06:05:57 ERROR TaskSetManager: Task 0.0 in stage 0.0 (TID 0) had
>> a not serializable result: org.apache.hadoop.hbase.client.Get
>> Serialization stack:
>> - object not serializable (class: org.apache.hadoop.hbase.client.Get,
>> value:
>> {"timeRange":[0,9223372036854775807],"totalColumns":0,"cacheBlocks":true,"families":{},"maxVersions":1,"row":"row4"})
>> - element of array (index: 0)
>> - array (class [Ljava.lang.Object;, size 6); not retrying
>> 16/01/15 06:05:57 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose
>> tasks have all completed, from pool
>> 16/01/15 06:05:57 INFO TaskSchedulerImpl: Cancelling stage 0
>> 16/01/15 06:05:57 INFO DAGScheduler: ResultStage 0 (collect at
>> App.java:278) failed in 2.481 s
>> 16/01/15 06:05:57 INFO DAGScheduler: Job 0 failed: collect at
>> App.java:278, took 3.378240 s
>> [WARNING]
>> java.lang.reflect.InvocationTargetException
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:606)
>> at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:293)
>> at java.lang.Thread.run(Thread.java:724)
>> Caused by: org.apache.spark.SparkException: Job aborted due to stage
>> failure: Task 0.0 in stage 0.0 (TID 0) had a not serializable 

Re: SparkContext SyntaxError: invalid syntax

2016-01-15 Thread Andrew Weiner
Indeed!  Here is the output when I run in cluster mode:

Traceback (most recent call last):
  File "pi.py", line 22, in ?
raise RuntimeError("\n"+str(sys.version_info) +"\n"+
RuntimeError:
(2, 4, 3, 'final', 0)
[('PYSPARK_GATEWAY_PORT', '48079'), ('PYTHONPATH',
'/scratch2/hadoop/yarn/local/usercache//filecache/116/spark-assembly-1.6.0-hadoop2.4.0.jar:/home//spark-1.6.0-bin-hadoop2.4/python:/home//code/libs:/scratch5/hadoop/yarn/local/usercache//appcache/application_1450370639491_0239/container_1450370639491_0239_01_01/pyspark.zip:/scratch5/hadoop/yarn/local/usercache//appcache/application_1450370639491_0239/container_1450370639491_0239_01_01/py4j-0.9-src.zip'),
('PYTHONUNBUFFERED', 'YES')]

As we suspected, it is using Python 2.4

One thing that surprises me is that PYSPARK_PYTHON is not showing up
in the list, even though I am setting it and exporting it in
spark-submit *and* in spark-env.sh.  Is there somewhere else I need to
set this variable?  Maybe in one of the hadoop conf files in my
HADOOP_CONF_DIR?

Andrew



On Thu, Jan 14, 2016 at 1:14 PM, Bryan Cutler  wrote:

> It seems like it could be the case that some other Python version is being
> invoked.  To make sure, can you add something like this to the top of the
> .py file you are submitting to get some more info about how the application
> master is configured?
>
> import sys, os
> raise RuntimeError("\n"+str(sys.version_info) +"\n"+
> str([(k,os.environ[k]) for k in os.environ if "PY" in k]))
>
> On Thu, Jan 14, 2016 at 8:37 AM, Andrew Weiner <
> andrewweiner2...@u.northwestern.edu> wrote:
>
>> Hi Bryan,
>>
>> I ran "$> python --version" on every node on the cluster, and it is
>> Python 2.7.8 for every single one.
>>
>> When I try to submit the Python example in client mode
>> * ./bin/spark-submit  --master yarn --deploy-mode client
>> --driver-memory 4g --executor-memory 2g --executor-cores 1
>> ./examples/src/main/python/pi.py 10*
>> That's when I get this error that I mentioned:
>>
>> 16/01/14 10:09:10 WARN scheduler.TaskSetManager: Lost task 0.0 in stage
>> 0.0 (TID 0, mundonovo-priv): org.apache.spark.SparkException:
>> Error from python worker:
>>   python: module pyspark.daemon not found
>> PYTHONPATH was:
>>
>> /scratch5/hadoop/yarn/local/usercache//filecache/48/spark-assembly-1.6.0-hadoop2.4.0.jar:/home/aqualab/spark-1.6.0-bin-hadoop2.4/python:/home/jpr123/hg.pacific/python-common:/home/jp
>>
>> r123/python-libs:/home/jpr123/lib/python2.7/site-packages:/home/zsb739/local/lib/python2.7/site-packages:/home/jpr123/mobile-cdn-analysis:/home//lib/python2.7/site-packages:/home//code/libs:/scratch5/hadoop/yarn/local/usercache//appcache/application_1450370639491_0187/container_1450370639491_0187_01_02/pyspark.zip:/scratch5/hadoop/yarn/local/usercache//appcache/application_1450370639491_0187/container_1450370639491_0187_01_02/py4j-0.9-src.zip
>> java.io.EOFException
>> at java.io.DataInputStream.readInt(DataInputStream.java:392)
>> at
>> org.apache.spark.api.python.PythonWorkerFactory.startDaemon(PythonWorkerFactory.scala:164)
>> at []
>>
>> followed by several more similar errors that also say:
>> Error from python worker:
>>   python: module pyspark.daemon not found
>>
>>
>> Even though the default python appeared to be correct, I just went ahead
>> and explicitly set PYSPARK_PYTHON in conf/spark-env.sh to the path of the
>> default python binary executable.  After making this change I was able to
>> run the job successfully in client!  That is, this appeared to fix the
>> "pyspark.daemon not found" error when running in client mode.
>>
>> However, when running in cluster mode, I am still getting the same syntax
>> error:
>>
>> Traceback (most recent call last):
>>   File "pi.py", line 24, in ?
>> from pyspark import SparkContext
>>   File 
>> "/home//spark-1.6.0-bin-hadoop2.4/python/pyspark/__init__.py", 
>> line 61
>> indent = ' ' * (min(len(m) for m in indents) if indents else 0)
>>   ^
>> SyntaxError: invalid syntax
>>
>> Is it possible that the PYSPARK_PYTHON environment variable is ignored when 
>> jobs are submitted in cluster mode?  It seems that Spark or Yarn is going 
>> behind my back, so to speak, and using some older version of python I didn't 
>> even know was installed.
>>
>> Thanks again for all your help thus far.  We are getting close
>>
>> Andrew
>>
>>
>>
>> On Wed, Jan 13, 2016 at 6:13 PM, Bryan Cutler  wrote:
>>
>>> Hi Andrew,
>>>
>>> There are a couple of things to check.  First, is Python 2.7 the default
>>> version on all nodes in the cluster or is it an alternate install? Meaning
>>> what is the output of this command "$>  python --version"  If it is an
>>> alternate install, you could set the environment variable "
>>> PYSPARK_PYTHON" Python binary executable to use for PySpark in both
>>> driver and workers (default is python).
>>>
>>> Did you try 

Spark Streaming: routing by key without groupByKey

2016-01-15 Thread Lin Zhao
I have requirement to route a paired DStream to a series of map and flatMap 
such that entries with the same key goes to the same thread within the same 
batch. Closest I can come up with is groupByKey().flatMap(_._2). But this kills 
throughput by 50%.

When I think about it groupByKey is more than I need. With groupByKey the same 
thread sees all events with key Alice at a time, and only Alice. For my 
requirement if there are Bob, Charlie in between it's still OK. This seems to 
be a common routing requirement and shouldn't cause 50% performance hit. Is 
there a way to construct the stream in such way that I'm not aware of?

I have read 
https://databricks.gitbooks.io/databricks-spark-knowledge-base/content/best_practices/prefer_reducebykey_over_groupbykey.html
 but reduceByKey isn't the solution since we are not doing aggregation. Our 
stream is a chain of map and flatMap[withState]


Re: DataFrame partitionBy to a single Parquet file (per partition)

2016-01-15 Thread Cheng Lian
You may try DataFrame.repartition(partitionExprs: Column*) to shuffle 
all data belonging to a single (data) partition into a single (RDD) 
partition:


|df.coalesce(1)|||.repartition("entity", "year", "month", "day", 
"status")|.write.partitionBy("entity", "year", "month", "day", 
"status").mode(SaveMode.Append).parquet(s"$location")|


(Unfortunately the naming here can be quite confusing.)

Cheng

On 1/14/16 11:48 PM, Patrick McGloin wrote:

Hi,

I would like to reparation / coalesce my data so that it is saved into 
one Parquet file per partition. I would also like to use the Spark SQL 
partitionBy API. So I could do that like this:


|df.coalesce(1).write.partitionBy("entity", "year", "month", "day", 
"status").mode(SaveMode.Append).parquet(s"$location") |


I've tested this and it doesn't seem to perform well. This is because 
there is only one partition to work on in the dataset and all the 
partitioning, compression and saving of files has to be done by one 
CPU core.


I could rewrite this to do the partitioning manually (using filter 
with the distinct partition values for example) before calling coalesce.


But is there a better way to do this using the standard Spark SQL API?

Best regards,

Patrick






RE: jobs much slower in cluster mode vs local

2016-01-15 Thread Saif.A.Ellafi
Thank you, this looks useful indeed for what I have in mind.

Saif

From: Jiří Syrový [mailto:syrovy.j...@gmail.com]
Sent: Friday, January 15, 2016 12:06 PM
To: Ellafi, Saif A.
Cc: user@spark.apache.org
Subject: Re: jobs much slower in cluster mode vs local

Hi,

you can try to use spark job server and submit jobs to it. The thing is that 
the most expensive part is context creation.
J.

2016-01-15 15:28 GMT+01:00 
>:
Hello,

In general, I am usually able to run spark submit jobs in local mode, in a 
32-cores node with plenty of memory ram. The performance is significantly 
faster in local mode than when using a cluster of spark workers.

How can this be explained and what measures can one take in order to improve 
such performance?
Usually a job that takes 35 seconds in local mode takes around 48 seconds in a 
small cluster.

Thanks,
Saif




Re: sqlContext.cacheTable("tableName") vs dataFrame.cache()

2016-01-15 Thread Kevin Mellott
Hi George,

I believe that sqlContext.cacheTable("tableName") is to be used when you
want to cache the data that is being used within a Spark SQL query. For
example, take a look at the code below.


> val myData = sqlContext.load("com.databricks.spark.csv", Map("path" ->
> "hdfs://somepath/file", "header" -> "false").toDF("col1", "col2")
>
myData.registerTempTable("myData")


Here, the usage of *cache()* will affect ONLY the *myData.select* query.

> myData.cache()

myData.select("col1", "col2").show()


Here, the usage of *cacheTable* will affect ONLY the *sqlContext.sql* query.

> sqlContext.cacheTable("myData")

sqlContext.sql("SELECT col1, col2 FROM myData").show()


Thanks,
Kevin

On Fri, Jan 15, 2016 at 7:00 AM, George Sigletos 
wrote:

> According to the documentation they are exactly the same, but in my
> queries
>
> dataFrame.cache()
>
> results in much faster execution times vs doing
>
> sqlContext.cacheTable("tableName")
>
> Is there any explanation about this? I am not caching the RDD prior to
> creating the dataframe. Using Pyspark on Spark 1.5.2
>
> Kind regards,
> George
>


Feature importance for RandomForestRegressor in Spark 1.5

2016-01-15 Thread Scott Imig
Hello,

I have a couple of quick questions about this pull request, which adds feature 
importance calculations to the random forests in MLLib.

https://github.com/apache/spark/pull/7838

1. Can someone help me determine the Spark version where this is first 
available?  (1.5.0?  1.5.1?)

2. Following the templates in this  documentation to construct a 
RandomForestModel, should I be able to retrieve model.featureImportances?  Or 
is there a different pattern for random forests in more recent spark versions?

https://spark.apache.org/docs/1.2.0/mllib-ensembles.html

Thanks for the help!
Imig
--
S. Imig | Senior Data Scientist Engineer | richrelevance |m: 425.999.5725

I support Bip 101 and BitcoinXT.


Re: Feature importance for RandomForestRegressor in Spark 1.5

2016-01-15 Thread Robin East
re 1.
The pull requests reference the JIRA ticket in this case 
https://issues.apache.org/jira/browse/SPARK-5133 
. The JIRA says it was 
released in 1.5.


---
Robin East
Spark GraphX in Action Michael Malak and Robin East
Manning Publications Co.
http://www.manning.com/books/spark-graphx-in-action 






> On 15 Jan 2016, at 16:06, Scott Imig  wrote:
> 
> Hello,
> 
> I have a couple of quick questions about this pull request, which adds 
> feature importance calculations to the random forests in MLLib.
> 
> https://github.com/apache/spark/pull/7838 
> 
> 
> 1. Can someone help me determine the Spark version where this is first 
> available?  (1.5.0?  1.5.1?)
> 
> 2. Following the templates in this  documentation to construct a 
> RandomForestModel, should I be able to retrieve model.featureImportances?  Or 
> is there a different pattern for random forests in more recent spark versions?
> 
> https://spark.apache.org/docs/1.2.0/mllib-ensembles.html 
> 
> 
> Thanks for the help!
> Imig
> -- 
> S. Imig | Senior Data Scientist Engineer | richrelevance |m: 425.999.5725
> 
> I support Bip 101 and BitcoinXT .



Re: simultaneous actions

2016-01-15 Thread Sean Owen
Can you run N jobs depending on the same RDD in parallel on the
driver? certainly. The context / scheduling is thread-safe and the RDD
is immutable. I've done this to, for example, build and evaluate a
bunch of models simultaneously on a big cluster.

On Fri, Jan 15, 2016 at 7:10 PM, Jakob Odersky  wrote:
> I don't think RDDs are threadsafe.
> More fundamentally however, why would you want to run RDD actions in
> parallel? The idea behind RDDs is to provide you with an abstraction for
> computing parallel operations on distributed data. Even if you were to call
> actions from several threads at once, the individual executors of your spark
> environment would still have to perform operations sequentially.
>
> As an alternative, I would suggest to restructure your RDD transformations
> to compute the required results in one single operation.
>
> On 15 January 2016 at 06:18, Jonathan Coveney  wrote:
>>
>> Threads
>>
>>
>> El viernes, 15 de enero de 2016, Kira  escribió:
>>>
>>> Hi,
>>>
>>> Can we run *simultaneous* actions on the *same RDD* ?; if yes how can
>>> this
>>> be done ?
>>>
>>> Thank you,
>>> Regards
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/simultaneous-actions-tp25977.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: DataFrame partitionBy to a single Parquet file (per partition)

2016-01-15 Thread Patrick McGloin
I will try this in Monday. Thanks for the tip.

On Fri, 15 Jan 2016, 18:58 Cheng Lian  wrote:

> You may try DataFrame.repartition(partitionExprs: Column*) to shuffle all
> data belonging to a single (data) partition into a single (RDD) partition:
>
> df.coalesce(1).repartition("entity", "year", "month", "day", 
> "status").write.partitionBy("entity", "year", "month", "day", 
> "status").mode(SaveMode.Append).parquet(s"$location")
>
> (Unfortunately the naming here can be quite confusing.)
>
>
> Cheng
>
>
> On 1/14/16 11:48 PM, Patrick McGloin wrote:
>
> Hi,
>
> I would like to reparation / coalesce my data so that it is saved into one
> Parquet file per partition. I would also like to use the Spark SQL
> partitionBy API. So I could do that like this:
>
> df.coalesce(1).write.partitionBy("entity", "year", "month", "day", 
> "status").mode(SaveMode.Append).parquet(s"$location")
>
> I've tested this and it doesn't seem to perform well. This is because
> there is only one partition to work on in the dataset and all the
> partitioning, compression and saving of files has to be done by one CPU
> core.
>
> I could rewrite this to do the partitioning manually (using filter with
> the distinct partition values for example) before calling coalesce.
>
> But is there a better way to do this using the standard Spark SQL API?
>
> Best regards,
>
> Patrick
>
>
>
>


Re: Spark Streaming + Kafka + scala job message read issue

2016-01-15 Thread vivek.meghanathan
All,
The issue was related to apache Cassandra. I have changed the Cassandra to 
datastax Cassandra and the issue is resolved. Also I have changed the spark 
version to 1.3.

There is some serious issue is there between spark Cassandra connector and 
apache Cassandra 2.1+ while using in spark streaming jobs.

Regards
Vivek

On Tue, Jan 05, 2016 at 4:38 pm, Vivek Meghanathan (WT01 - NEP) 
> wrote:

Hello All,

After investigating further using a test program, we were able to read the 
kafka input messages using spark streaming.

Once we add a particular line which performs map and reduce – and groupByKey 
(all written in single line), we are not seeing the input message details in 
the logs. We have increased the batch interval to 5 seconds and removed the 
numtasks (it was defined as 10) . Once we made this change the kafka messages 
started to get processed . But it takes long time to process.

This works fine in our local lab server but the problem in the google compute 
engine server. The local lab server is low in spec 8 cpu with 8GB ram but the 
cloud server is high memory one 30GB RAM and 8 CPU. As far as I could see the 
execution happens much faster in google platform but somehow the job processing 
getting messed up.

Any suggestions?


Regards,
Vivek M



From: Vivek Meghanathan (WT01 - NEP)
Sent: 27 December 2015 11:08
To: Bryan 
Cc: Vivek Meghanathan (WT01 - NEP) ; 
duc.was.h...@gmail.com; user@spark.apache.org
Subject: Re: Spark Streaming + Kafka + scala job message read issue


Hi Bryan,
Yes we are using only 1 thread per topic as we have only one Kafka server with 
1 partition.
What kind of logs will tell us what offset spark stream is reading from Kafka 
or is it resetting something without reading?

Regards
Vivek


Sent using CloudMagic 
Email
On Sun, Dec 27, 2015 at 12:03 am, Bryan 
> wrote:

Vivek,

Where you’re using numThreads – look at the documentation for createStream. I 
believe that number should be the number of partitions to consume.

Sent from Outlook Mail for 
Windows 10 phone


From: vivek.meghanat...@wipro.com
Sent: Friday, December 25, 2015 11:39 PM
To: bryan.jeff...@gmail.com
Cc: duc.was.h...@gmail.com; 
vivek.meghanat...@wipro.com; 
user@spark.apache.org
Subject: Re: Spark Streaming + Kafka + scala job message read issue


Hi Brian,PhuDuc,

All 8 jobs are consuming 8 different IN topics. 8 different Scala jobs running 
each topic map mentioned below has only 1 thread number mentioned. In this case 
group should not be a problem right.

Here is the complete flow, spring MVC sends in messages to Kafka , spark 
streaming reading that and sends message back to Kafka, some cases they will 
update data to Cassandra only. Spring the response messages.
I could see the message is always reaching Kafka (checked through the console 
consumer).

Regards
Vivek


Sent using CloudMagic 
Email
On Sat, Dec 26, 2015 at 2:42 am, Bryan 
> wrote:

Agreed. I did not see that they were using the same group name.

Sent from Outlook Mail for 
Windows 10 phone


From: PhuDuc Nguyen
Sent: Friday, December 25, 2015 3:35 PM
To: vivek.meghanat...@wipro.com
Cc: user@spark.apache.org
Subject: Re: Spark Streaming + Kafka + scala job message read issue

Vivek,

Did you say you have 8 spark jobs that are consuming from the same topic and 
all jobs are using the same consumer group name? If so, each job would get a 
subset of messages from that kafka topic, ie each job would get 1 out of 8 
messages from that topic. Is that your intent?

regards,
Duc






On Thu, Dec 24, 2015 at 7:20 AM, 
> wrote:
We are using the older receiver based approach, the number of partitions is 1 
(we have a single node kafka) and we use single thread per topic still we have 
the problem. Please see the API we use. All 8 spark jobs use same group name – 
is that a problem?

val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap  - Number of 
threads used here is 1
val searches = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(line 
=> parse(line._2).extract[Search])


Regards,
Vivek M
From: Bryan [mailto:bryan.jeff...@gmail.com]
Sent: 24 December 2015 17:20
To: Vivek Meghanathan (WT01 - NEP) 

Re: simultaneous actions

2016-01-15 Thread Jakob Odersky
I don't think RDDs are threadsafe.
More fundamentally however, why would you want to run RDD actions in
parallel? The idea behind RDDs is to provide you with an abstraction for
computing parallel operations on distributed data. Even if you were to call
actions from several threads at once, the individual executors of your
spark environment would still have to perform operations sequentially.

As an alternative, I would suggest to restructure your RDD transformations
to compute the required results in one single operation.

On 15 January 2016 at 06:18, Jonathan Coveney  wrote:

> Threads
>
>
> El viernes, 15 de enero de 2016, Kira  escribió:
>
>> Hi,
>>
>> Can we run *simultaneous* actions on the *same RDD* ?; if yes how can this
>> be done ?
>>
>> Thank you,
>> Regards
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/simultaneous-actions-tp25977.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>


Re: simultaneous actions

2016-01-15 Thread Jonathan Coveney
SparkContext is thread safe. And RDDs just describe operations.

While I generally agree that you want to model as much possible as
transformations as possible, this is not always possible. And in that case,
you have no option than to use threads.

Spark's designers should have made all actions return Futures, but alas...

El viernes, 15 de enero de 2016, Jakob Odersky 
escribió:

> I don't think RDDs are threadsafe.
> More fundamentally however, why would you want to run RDD actions in
> parallel? The idea behind RDDs is to provide you with an abstraction for
> computing parallel operations on distributed data. Even if you were to call
> actions from several threads at once, the individual executors of your
> spark environment would still have to perform operations sequentially.
>
> As an alternative, I would suggest to restructure your RDD transformations
> to compute the required results in one single operation.
>
> On 15 January 2016 at 06:18, Jonathan Coveney  > wrote:
>
>> Threads
>>
>>
>> El viernes, 15 de enero de 2016, Kira > > escribió:
>>
>>> Hi,
>>>
>>> Can we run *simultaneous* actions on the *same RDD* ?; if yes how can
>>> this
>>> be done ?
>>>
>>> Thank you,
>>> Regards
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/simultaneous-actions-tp25977.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>


Re: SparkContext SyntaxError: invalid syntax

2016-01-15 Thread Andrew Weiner
I tried playing around with my environment variables, and here is an update.

When I run in cluster mode, my environment variables do not persist
throughout the entire job.
For example, I tried creating a local copy of HADOOP_CONF_DIR in
/home//local/etc/hadoop/conf, and then, in spark-env.sh I the
variable:
export HADOOP_CONF_DIR=/home//local/etc/hadoop/conf

Later, when we print the environment variables in the python code, I see
this:

('HADOOP_CONF_DIR', '/etc/hadoop/conf')

However, when I run in client mode, I see this:

('HADOOP_CONF_DIR', '/home/awp066/local/etc/hadoop/conf')

Furthermore, if I omit that environment variable from spark-env.sh
altogether, I get the expected error in both client and cluster mode:

When running with master 'yarn'
either HADOOP_CONF_DIR or YARN_CONF_DIR must be set in the environment.

This suggests that my environment variables are being used when I
first submit the job, but at some point during the job, my environment
variables are thrown out and someone's (yarn's?) environment variables
are being used.

Andrew


On Fri, Jan 15, 2016 at 11:03 AM, Andrew Weiner <
andrewweiner2...@u.northwestern.edu> wrote:

> Indeed!  Here is the output when I run in cluster mode:
>
> Traceback (most recent call last):
>   File "pi.py", line 22, in ?
> raise RuntimeError("\n"+str(sys.version_info) +"\n"+
> RuntimeError:
> (2, 4, 3, 'final', 0)
> [('PYSPARK_GATEWAY_PORT', '48079'), ('PYTHONPATH', 
> '/scratch2/hadoop/yarn/local/usercache//filecache/116/spark-assembly-1.6.0-hadoop2.4.0.jar:/home//spark-1.6.0-bin-hadoop2.4/python:/home//code/libs:/scratch5/hadoop/yarn/local/usercache//appcache/application_1450370639491_0239/container_1450370639491_0239_01_01/pyspark.zip:/scratch5/hadoop/yarn/local/usercache//appcache/application_1450370639491_0239/container_1450370639491_0239_01_01/py4j-0.9-src.zip'),
>  ('PYTHONUNBUFFERED', 'YES')]
>
> As we suspected, it is using Python 2.4
>
> One thing that surprises me is that PYSPARK_PYTHON is not showing up in the 
> list, even though I am setting it and exporting it in spark-submit *and* in 
> spark-env.sh.  Is there somewhere else I need to set this variable?  Maybe in 
> one of the hadoop conf files in my HADOOP_CONF_DIR?
>
> Andrew
>
>
>
> On Thu, Jan 14, 2016 at 1:14 PM, Bryan Cutler  wrote:
>
>> It seems like it could be the case that some other Python version is
>> being invoked.  To make sure, can you add something like this to the top of
>> the .py file you are submitting to get some more info about how the
>> application master is configured?
>>
>> import sys, os
>> raise RuntimeError("\n"+str(sys.version_info) +"\n"+
>> str([(k,os.environ[k]) for k in os.environ if "PY" in k]))
>>
>> On Thu, Jan 14, 2016 at 8:37 AM, Andrew Weiner <
>> andrewweiner2...@u.northwestern.edu> wrote:
>>
>>> Hi Bryan,
>>>
>>> I ran "$> python --version" on every node on the cluster, and it is
>>> Python 2.7.8 for every single one.
>>>
>>> When I try to submit the Python example in client mode
>>> * ./bin/spark-submit  --master yarn --deploy-mode client
>>> --driver-memory 4g --executor-memory 2g --executor-cores 1
>>> ./examples/src/main/python/pi.py 10*
>>> That's when I get this error that I mentioned:
>>>
>>> 16/01/14 10:09:10 WARN scheduler.TaskSetManager: Lost task 0.0 in stage
>>> 0.0 (TID 0, mundonovo-priv): org.apache.spark.SparkException:
>>> Error from python worker:
>>>   python: module pyspark.daemon not found
>>> PYTHONPATH was:
>>>
>>> /scratch5/hadoop/yarn/local/usercache//filecache/48/spark-assembly-1.6.0-hadoop2.4.0.jar:/home/aqualab/spark-1.6.0-bin-hadoop2.4/python:/home/jpr123/hg.pacific/python-common:/home/jp
>>>
>>> r123/python-libs:/home/jpr123/lib/python2.7/site-packages:/home/zsb739/local/lib/python2.7/site-packages:/home/jpr123/mobile-cdn-analysis:/home//lib/python2.7/site-packages:/home//code/libs:/scratch5/hadoop/yarn/local/usercache//appcache/application_1450370639491_0187/container_1450370639491_0187_01_02/pyspark.zip:/scratch5/hadoop/yarn/local/usercache//appcache/application_1450370639491_0187/container_1450370639491_0187_01_02/py4j-0.9-src.zip
>>> java.io.EOFException
>>> at java.io.DataInputStream.readInt(DataInputStream.java:392)
>>> at
>>> org.apache.spark.api.python.PythonWorkerFactory.startDaemon(PythonWorkerFactory.scala:164)
>>> at []
>>>
>>> followed by several more similar errors that also say:
>>> Error from python worker:
>>>   python: module pyspark.daemon not found
>>>
>>>
>>> Even though the default python appeared to be correct, I just went ahead
>>> and explicitly set PYSPARK_PYTHON in conf/spark-env.sh to the path of the
>>> default python binary executable.  After making this change I was able to
>>> run the job successfully in client!  That is, this appeared to fix the
>>> "pyspark.daemon not found" error when running in client mode.
>>>
>>> However, when running in cluster mode, I am still getting the same
>>> syntax 

RE: Is it possible to use SparkSQL JDBC ThriftServer without Hive

2016-01-15 Thread Sambit Tripathy (RBEI/EDS1)
Hi Mohammed,

I think this is something you can do at the Thrift server startup. So this 
would run an instance of Derby and act as a Metastore. Any idea if this Debry 
Metastore will have distributed access and why do we use the Hive Metastore 
then?

@Angela: I would  also be happy to have a metastore owned by Spark Thrift 
Server. What are you trying to achieve by using the Thrift server without Hive?


Regards,
Sambit.


-Original Message-
From: Mohammed Guller [mailto:moham...@glassbeam.com] 
Sent: Wednesday, January 13, 2016 2:54 PM
To: angela.whelan ; user@spark.apache.org
Subject: RE: Is it possible to use SparkSQL JDBC ThriftServer without Hive

Hi Angela,
Yes, you can use Spark SQL JDBC/ThriftServer without Hive.

Mohammed


-Original Message-
From: angela.whelan [mailto:angela.whe...@synchronoss.com] 
Sent: Wednesday, January 13, 2016 3:37 AM
To: user@spark.apache.org
Subject: Is it possible to use SparkSQL JDBC ThriftServer without Hive

hi,
I'm wondering if it is possible to use the SparkSQL JDBC ThriftServer without 
Hive?

The reason I'm asking is that we are unsure about the speed of Hive with 
SparkSQL JDBC connectivity.

I can't find any article online about using SparkSQL JDBC ThriftServer without 
Hive.

Many thanks in advance for any help on this.

Thanks, Angela



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Is-it-possible-to-use-SparkSQL-JDBC-ThriftServer-without-Hive-tp25959.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: simultaneous actions

2016-01-15 Thread Matei Zaharia
RDDs actually are thread-safe, and quite a few applications use them this way, 
e.g. the JDBC server.

Matei

> On Jan 15, 2016, at 2:10 PM, Jakob Odersky  wrote:
> 
> I don't think RDDs are threadsafe.
> More fundamentally however, why would you want to run RDD actions in 
> parallel? The idea behind RDDs is to provide you with an abstraction for 
> computing parallel operations on distributed data. Even if you were to call 
> actions from several threads at once, the individual executors of your spark 
> environment would still have to perform operations sequentially.
> 
> As an alternative, I would suggest to restructure your RDD transformations to 
> compute the required results in one single operation.
> 
> On 15 January 2016 at 06:18, Jonathan Coveney  > wrote:
> Threads
> 
> 
> El viernes, 15 de enero de 2016, Kira  > escribió:
> Hi,
> 
> Can we run *simultaneous* actions on the *same RDD* ?; if yes how can this
> be done ?
> 
> Thank you,
> Regards
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/simultaneous-actions-tp25977.html
>  
> 
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org <>
> For additional commands, e-mail: user-h...@spark.apache.org <>
> 
> 



Re: SparkContext SyntaxError: invalid syntax

2016-01-15 Thread Andrew Weiner
Actually, I just found this [
https://issues.apache.org/jira/browse/SPARK-1680], which after a bit of
googling and reading leads me to believe that the preferred way to change
the yarn environment is to edit the spark-defaults.conf file by adding this
line:
spark.yarn.appMasterEnv.PYSPARK_PYTHON/path/to/python

While both this solution and the solution from my prior email work, I
believe this is the preferred solution.

Sorry for the flurry of emails.  Again, thanks for all the help!

Andrew

On Fri, Jan 15, 2016 at 1:47 PM, Andrew Weiner <
andrewweiner2...@u.northwestern.edu> wrote:

> I finally got the pi.py example to run in yarn cluster mode.  This was the
> key insight:
> https://issues.apache.org/jira/browse/SPARK-9229
>
> I had to set SPARK_YARN_USER_ENV in spark-env.sh:
> export SPARK_YARN_USER_ENV="PYSPARK_PYTHON=/home/aqualab/local/bin/python"
>
> This caused the PYSPARK_PYTHON environment variable to be used in my yarn
> environment in cluster mode.
>
> Thank you for all your help!
>
> Best,
> Andrew
>
>
>
> On Fri, Jan 15, 2016 at 12:57 PM, Andrew Weiner <
> andrewweiner2...@u.northwestern.edu> wrote:
>
>> I tried playing around with my environment variables, and here is an
>> update.
>>
>> When I run in cluster mode, my environment variables do not persist
>> throughout the entire job.
>> For example, I tried creating a local copy of HADOOP_CONF_DIR in
>> /home//local/etc/hadoop/conf, and then, in spark-env.sh I the
>> variable:
>> export HADOOP_CONF_DIR=/home//local/etc/hadoop/conf
>>
>> Later, when we print the environment variables in the python code, I see
>> this:
>>
>> ('HADOOP_CONF_DIR', '/etc/hadoop/conf')
>>
>> However, when I run in client mode, I see this:
>>
>> ('HADOOP_CONF_DIR', '/home/awp066/local/etc/hadoop/conf')
>>
>> Furthermore, if I omit that environment variable from spark-env.sh 
>> altogether, I get the expected error in both client and cluster mode:
>>
>> When running with master 'yarn'
>> either HADOOP_CONF_DIR or YARN_CONF_DIR must be set in the environment.
>>
>> This suggests that my environment variables are being used when I first 
>> submit the job, but at some point during the job, my environment variables 
>> are thrown out and someone's (yarn's?) environment variables are being used.
>>
>> Andrew
>>
>>
>> On Fri, Jan 15, 2016 at 11:03 AM, Andrew Weiner <
>> andrewweiner2...@u.northwestern.edu> wrote:
>>
>>> Indeed!  Here is the output when I run in cluster mode:
>>>
>>> Traceback (most recent call last):
>>>   File "pi.py", line 22, in ?
>>> raise RuntimeError("\n"+str(sys.version_info) +"\n"+
>>> RuntimeError:
>>> (2, 4, 3, 'final', 0)
>>> [('PYSPARK_GATEWAY_PORT', '48079'), ('PYTHONPATH', 
>>> '/scratch2/hadoop/yarn/local/usercache//filecache/116/spark-assembly-1.6.0-hadoop2.4.0.jar:/home//spark-1.6.0-bin-hadoop2.4/python:/home//code/libs:/scratch5/hadoop/yarn/local/usercache//appcache/application_1450370639491_0239/container_1450370639491_0239_01_01/pyspark.zip:/scratch5/hadoop/yarn/local/usercache//appcache/application_1450370639491_0239/container_1450370639491_0239_01_01/py4j-0.9-src.zip'),
>>>  ('PYTHONUNBUFFERED', 'YES')]
>>>
>>> As we suspected, it is using Python 2.4
>>>
>>> One thing that surprises me is that PYSPARK_PYTHON is not showing up in the 
>>> list, even though I am setting it and exporting it in spark-submit *and* in 
>>> spark-env.sh.  Is there somewhere else I need to set this variable?  Maybe 
>>> in one of the hadoop conf files in my HADOOP_CONF_DIR?
>>>
>>> Andrew
>>>
>>>
>>>
>>> On Thu, Jan 14, 2016 at 1:14 PM, Bryan Cutler  wrote:
>>>
 It seems like it could be the case that some other Python version is
 being invoked.  To make sure, can you add something like this to the top of
 the .py file you are submitting to get some more info about how the
 application master is configured?

 import sys, os
 raise RuntimeError("\n"+str(sys.version_info) +"\n"+
 str([(k,os.environ[k]) for k in os.environ if "PY" in k]))

 On Thu, Jan 14, 2016 at 8:37 AM, Andrew Weiner <
 andrewweiner2...@u.northwestern.edu> wrote:

> Hi Bryan,
>
> I ran "$> python --version" on every node on the cluster, and it is
> Python 2.7.8 for every single one.
>
> When I try to submit the Python example in client mode
> * ./bin/spark-submit  --master yarn --deploy-mode client
> --driver-memory 4g --executor-memory 2g --executor-cores 1
> ./examples/src/main/python/pi.py 10*
> That's when I get this error that I mentioned:
>
> 16/01/14 10:09:10 WARN scheduler.TaskSetManager: Lost task 0.0 in
> stage 0.0 (TID 0, mundonovo-priv): org.apache.spark.SparkException:
> Error from python worker:
>   python: module pyspark.daemon not found
> PYTHONPATH was:
>
> 

Re: simultaneous actions

2016-01-15 Thread Koert Kuipers
we run multiple actions on the same (cached) rdd all the time, i guess in
different threads indeed (its in akka)

On Fri, Jan 15, 2016 at 2:40 PM, Matei Zaharia 
wrote:

> RDDs actually are thread-safe, and quite a few applications use them this
> way, e.g. the JDBC server.
>
> Matei
>
> On Jan 15, 2016, at 2:10 PM, Jakob Odersky  wrote:
>
> I don't think RDDs are threadsafe.
> More fundamentally however, why would you want to run RDD actions in
> parallel? The idea behind RDDs is to provide you with an abstraction for
> computing parallel operations on distributed data. Even if you were to call
> actions from several threads at once, the individual executors of your
> spark environment would still have to perform operations sequentially.
>
> As an alternative, I would suggest to restructure your RDD transformations
> to compute the required results in one single operation.
>
> On 15 January 2016 at 06:18, Jonathan Coveney  wrote:
>
>> Threads
>>
>>
>> El viernes, 15 de enero de 2016, Kira  escribió:
>>
>>> Hi,
>>>
>>> Can we run *simultaneous* actions on the *same RDD* ?; if yes how can
>>> this
>>> be done ?
>>>
>>> Thank you,
>>> Regards
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/simultaneous-actions-tp25977.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com
>>> .
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>
>


Re: SparkContext SyntaxError: invalid syntax

2016-01-15 Thread Andrew Weiner
I finally got the pi.py example to run in yarn cluster mode.  This was the
key insight:
https://issues.apache.org/jira/browse/SPARK-9229

I had to set SPARK_YARN_USER_ENV in spark-env.sh:
export SPARK_YARN_USER_ENV="PYSPARK_PYTHON=/home/aqualab/local/bin/python"

This caused the PYSPARK_PYTHON environment variable to be used in my yarn
environment in cluster mode.

Thank you for all your help!

Best,
Andrew



On Fri, Jan 15, 2016 at 12:57 PM, Andrew Weiner <
andrewweiner2...@u.northwestern.edu> wrote:

> I tried playing around with my environment variables, and here is an
> update.
>
> When I run in cluster mode, my environment variables do not persist
> throughout the entire job.
> For example, I tried creating a local copy of HADOOP_CONF_DIR in
> /home//local/etc/hadoop/conf, and then, in spark-env.sh I the
> variable:
> export HADOOP_CONF_DIR=/home//local/etc/hadoop/conf
>
> Later, when we print the environment variables in the python code, I see
> this:
>
> ('HADOOP_CONF_DIR', '/etc/hadoop/conf')
>
> However, when I run in client mode, I see this:
>
> ('HADOOP_CONF_DIR', '/home/awp066/local/etc/hadoop/conf')
>
> Furthermore, if I omit that environment variable from spark-env.sh 
> altogether, I get the expected error in both client and cluster mode:
>
> When running with master 'yarn'
> either HADOOP_CONF_DIR or YARN_CONF_DIR must be set in the environment.
>
> This suggests that my environment variables are being used when I first 
> submit the job, but at some point during the job, my environment variables 
> are thrown out and someone's (yarn's?) environment variables are being used.
>
> Andrew
>
>
> On Fri, Jan 15, 2016 at 11:03 AM, Andrew Weiner <
> andrewweiner2...@u.northwestern.edu> wrote:
>
>> Indeed!  Here is the output when I run in cluster mode:
>>
>> Traceback (most recent call last):
>>   File "pi.py", line 22, in ?
>> raise RuntimeError("\n"+str(sys.version_info) +"\n"+
>> RuntimeError:
>> (2, 4, 3, 'final', 0)
>> [('PYSPARK_GATEWAY_PORT', '48079'), ('PYTHONPATH', 
>> '/scratch2/hadoop/yarn/local/usercache//filecache/116/spark-assembly-1.6.0-hadoop2.4.0.jar:/home//spark-1.6.0-bin-hadoop2.4/python:/home//code/libs:/scratch5/hadoop/yarn/local/usercache//appcache/application_1450370639491_0239/container_1450370639491_0239_01_01/pyspark.zip:/scratch5/hadoop/yarn/local/usercache//appcache/application_1450370639491_0239/container_1450370639491_0239_01_01/py4j-0.9-src.zip'),
>>  ('PYTHONUNBUFFERED', 'YES')]
>>
>> As we suspected, it is using Python 2.4
>>
>> One thing that surprises me is that PYSPARK_PYTHON is not showing up in the 
>> list, even though I am setting it and exporting it in spark-submit *and* in 
>> spark-env.sh.  Is there somewhere else I need to set this variable?  Maybe 
>> in one of the hadoop conf files in my HADOOP_CONF_DIR?
>>
>> Andrew
>>
>>
>>
>> On Thu, Jan 14, 2016 at 1:14 PM, Bryan Cutler  wrote:
>>
>>> It seems like it could be the case that some other Python version is
>>> being invoked.  To make sure, can you add something like this to the top of
>>> the .py file you are submitting to get some more info about how the
>>> application master is configured?
>>>
>>> import sys, os
>>> raise RuntimeError("\n"+str(sys.version_info) +"\n"+
>>> str([(k,os.environ[k]) for k in os.environ if "PY" in k]))
>>>
>>> On Thu, Jan 14, 2016 at 8:37 AM, Andrew Weiner <
>>> andrewweiner2...@u.northwestern.edu> wrote:
>>>
 Hi Bryan,

 I ran "$> python --version" on every node on the cluster, and it is
 Python 2.7.8 for every single one.

 When I try to submit the Python example in client mode
 * ./bin/spark-submit  --master yarn --deploy-mode client
 --driver-memory 4g --executor-memory 2g --executor-cores 1
 ./examples/src/main/python/pi.py 10*
 That's when I get this error that I mentioned:

 16/01/14 10:09:10 WARN scheduler.TaskSetManager: Lost task 0.0 in stage
 0.0 (TID 0, mundonovo-priv): org.apache.spark.SparkException:
 Error from python worker:
   python: module pyspark.daemon not found
 PYTHONPATH was:

 /scratch5/hadoop/yarn/local/usercache//filecache/48/spark-assembly-1.6.0-hadoop2.4.0.jar:/home/aqualab/spark-1.6.0-bin-hadoop2.4/python:/home/jpr123/hg.pacific/python-common:/home/jp

 r123/python-libs:/home/jpr123/lib/python2.7/site-packages:/home/zsb739/local/lib/python2.7/site-packages:/home/jpr123/mobile-cdn-analysis:/home//lib/python2.7/site-packages:/home//code/libs:/scratch5/hadoop/yarn/local/usercache//appcache/application_1450370639491_0187/container_1450370639491_0187_01_02/pyspark.zip:/scratch5/hadoop/yarn/local/usercache//appcache/application_1450370639491_0187/container_1450370639491_0187_01_02/py4j-0.9-src.zip
 java.io.EOFException
 at java.io.DataInputStream.readInt(DataInputStream.java:392)
 at
 org.apache.spark.api.python.PythonWorkerFactory.startDaemon(PythonWorkerFactory.scala:164)

Re: simultaneous actions

2016-01-15 Thread Jakob Odersky
I stand corrected. How considerable are the benefits though? Will the
scheduler be able to dispatch jobs from both actions simultaneously (or on
a when-workers-become-available basis)?

On 15 January 2016 at 11:44, Koert Kuipers  wrote:

> we run multiple actions on the same (cached) rdd all the time, i guess in
> different threads indeed (its in akka)
>
> On Fri, Jan 15, 2016 at 2:40 PM, Matei Zaharia 
> wrote:
>
>> RDDs actually are thread-safe, and quite a few applications use them this
>> way, e.g. the JDBC server.
>>
>> Matei
>>
>> On Jan 15, 2016, at 2:10 PM, Jakob Odersky  wrote:
>>
>> I don't think RDDs are threadsafe.
>> More fundamentally however, why would you want to run RDD actions in
>> parallel? The idea behind RDDs is to provide you with an abstraction for
>> computing parallel operations on distributed data. Even if you were to call
>> actions from several threads at once, the individual executors of your
>> spark environment would still have to perform operations sequentially.
>>
>> As an alternative, I would suggest to restructure your RDD
>> transformations to compute the required results in one single operation.
>>
>> On 15 January 2016 at 06:18, Jonathan Coveney  wrote:
>>
>>> Threads
>>>
>>>
>>> El viernes, 15 de enero de 2016, Kira  escribió:
>>>
 Hi,

 Can we run *simultaneous* actions on the *same RDD* ?; if yes how can
 this
 be done ?

 Thank you,
 Regards



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/simultaneous-actions-tp25977.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com
 .

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


>>
>>
>


Re: simultaneous actions

2016-01-15 Thread Sean Owen
It makes sense if you're parallelizing jobs that have relatively few
tasks, and have a lot of execution slots available. It makes sense to
turn them loose all at once and try to use the parallelism available.

There are downsides, eventually: for example, N jobs accessing one
cached RDD may recompute the RDD's partitions many times since the
cached copy may not be available when many of them start. At some
level, way oversubscribing your cluster with a backlog of tasks is
bad. And you might find it's a net loss if a bunch of tasks try to
schedule at the same time that all access the same data, since only
some can be local to the data.

On Fri, Jan 15, 2016 at 8:11 PM, Jakob Odersky  wrote:
> I stand corrected. How considerable are the benefits though? Will the
> scheduler be able to dispatch jobs from both actions simultaneously (or on a
> when-workers-become-available basis)?

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: DataFrameWriter on partitionBy for parquet eat all RAM

2016-01-15 Thread Michael Armbrust
See here for some workarounds:
https://issues.apache.org/jira/browse/SPARK-12546

On Thu, Jan 14, 2016 at 6:46 PM, Jerry Lam  wrote:

> Hi Arkadiusz,
>
> the partitionBy is not designed to have many distinct value the last time
> I used it. If you search in the mailing list, I think there are couple of
> people also face similar issues. For example, in my case, it won't work
> over a million distinct user ids. It will require a lot of memory and very
> long time to read the table back.
>
> Best Regards,
>
> Jerry
>
> On Thu, Jan 14, 2016 at 2:31 PM, Arkadiusz Bicz 
> wrote:
>
>> Hi
>>
>> What is the proper configuration for saving parquet partition with
>> large number of repeated keys?
>>
>> On bellow code I load 500 milion rows of data and partition it on
>> column with not so many different values.
>>
>> Using spark-shell with 30g per executor and driver and 3 executor cores
>>
>>
>> sqlContext.read.load("hdfs://notpartitioneddata").write.partitionBy("columnname").parquet("partitioneddata")
>>
>>
>> Job failed because not enough memory in executor :
>>
>> WARN YarnSchedulerBackend$YarnSchedulerEndpoint: Container killed by
>> YARN for exceeding memory limits. 43.5 GB of 43.5 GB physical memory
>> used. Consider boosting spark.yarn.executor.memoryOverhead.
>> 16/01/14 17:32:38 ERROR YarnScheduler: Lost executor 11 on
>> datanode2.babar.poc: Container killed by YARN for exceeding memory
>> limits. 43.5 GB of 43.5 GB physical memory used. Consider boosting
>> spark.yarn.executor.memoryOverhead.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


has any one implemented TF_IDF using ML transformers?

2016-01-15 Thread Andy Davidson
I wonder if I am missing something? TF-IDF is very popular. Spark ML has a
lot of transformers how ever it TF_IDF is not supported directly.

Spark provide a HashingTF and IDF transformer. The java doc
http://spark.apache.org/docs/latest/mllib-feature-extraction.html#tf-idf

Mentions you can implement TFIDF as follows

TFIDF(t,d,D)=TF(t,d)・IDF(t,D).

The problem I am running into is both HashingTF and IDF return a sparse
vector.

Ideally the spark code  to implement TFIDF would be one line.

 DataFrame ret = tmp.withColumn("features",
tmp.col("tf").multiply(tmp.col("idf")));

org.apache.spark.sql.AnalysisException: cannot resolve '(tf * idf)' due to
data type mismatch: '(tf * idf)' requires numeric type, not vector;

I could implement my own UDF to do member wise multiplication how ever given
how common TF-IDF is I wonder if this code already exists some where

I found  org.apache.spark.util.Vector.Multiplier. There is no documentation
how ever give the argument is double, my guess is it just does scalar
multiplication. 

I guess I could do something like

Double[] v = mySparkVector.toArray();
 Then use JBlas to do member wise multiplication

I assume sparceVectors are not distributed so there  would not be any
additional communication cost


If this code is truly missing. I would be happy to write it and donate it

Andy


From:  Andrew Davidson 
Date:  Wednesday, January 13, 2016 at 2:52 PM
To:  "user @spark" 
Subject:  trouble calculating TF-IDF data type mismatch: '(tf * idf)'
requires numeric type, not vector;

> Bellow is a little snippet of my Java Test Code. Any idea how I implement
> member wise vector multiplication?
> 
> Kind regards
> 
> Andy
> 
> transformed df printSchema()
> 
> root
> 
>  |-- id: integer (nullable = false)
> 
>  |-- label: double (nullable = false)
> 
>  |-- words: array (nullable = false)
> 
>  ||-- element: string (containsNull = true)
> 
>  |-- tf: vector (nullable = true)
> 
>  |-- idf: vector (nullable = true)
> 
> 
> 
> +---+-++-+
> ---+
> 
> |id |label|words   |tf   |idf
> |
> 
> +---+-++-+
> ---+
> 
> |0  |0.0  |[Chinese, Beijing, Chinese] |(7,[1,2],[2.0,1.0])
> |(7,[1,2],[0.0,0.9162907318741551]) |
> 
> |1  |0.0  |[Chinese, Chinese, Shanghai]|(7,[1,4],[2.0,1.0])
> |(7,[1,4],[0.0,0.9162907318741551]) |
> 
> |2  |0.0  |[Chinese, Macao]|(7,[1,6],[1.0,1.0])
> |(7,[1,6],[0.0,0.9162907318741551]) |
> 
> |3  |1.0  |[Tokyo, Japan, Chinese]
> |(7,[1,3,5],[1.0,1.0,1.0])|(7,[1,3,5],[0.0,0.9162907318741551,0.91629073187415
> 51])|
> 
> +---+-++-+
> ---+
> 
> 
> @Test
> 
> public void test() {
> 
> DataFrame rawTrainingDF = createTrainingData();
> 
> DataFrame trainingDF = runPipleLineTF_IDF(rawTrainingDF);
> 
> . . .
> 
> }
> 
>private DataFrame runPipleLineTF_IDF(DataFrame rawDF) {
> 
> HashingTF hashingTF = new HashingTF()
> 
> .setInputCol("words")
> 
> .setOutputCol("tf")
> 
> .setNumFeatures(dictionarySize);
> 
> 
> 
> DataFrame termFrequenceDF = hashingTF.transform(rawDF);
> 
> 
> 
> termFrequenceDF.cache(); // idf needs to make 2 passes over data set
> 
> IDFModel idf = new IDF()
> 
> //.setMinDocFreq(1) // our vocabulary has 6 words we
> hash into 7
> 
> .setInputCol(hashingTF.getOutputCol())
> 
> .setOutputCol("idf")
> 
> .fit(termFrequenceDF);
> 
> 
> 
> DataFrame tmp = idf.transform(termFrequenceDF);
> 
> 
> 
> DataFrame ret = tmp.withColumn("features",
> tmp.col("tf").multiply(tmp.col("idf")));
> 
> logger.warn("\ntransformed df printSchema()");
> 
> ret.printSchema();
> 
> ret.show(false);
> 
> 
> 
> return ret;
> 
> }
> 
> 
> 
> org.apache.spark.sql.AnalysisException: cannot resolve '(tf * idf)' due to
> data type mismatch: '(tf * idf)' requires numeric type, not vector;
> 
> 
> 
> 
> 
> private DataFrame createTrainingData() {
> 
> // make sure we only use dictionarySize words
> 
> JavaRDD rdd = javaSparkContext.parallelize(Arrays.asList(
> 
> // 0 is Chinese
> 
> // 1 in notChinese
> 
> RowFactory.create(0, 0.0, Arrays.asList("Chinese", "Beijing",
> "Chinese")),
> 
> RowFactory.create(1, 0.0, Arrays.asList("Chinese", "Chinese",
> "Shanghai")),
> 
>   

Re: [Spark 1.6][Streaming] About the behavior of mapWithState

2016-01-15 Thread Shixiong(Ryan) Zhu
Hey Terry,

That's expected. If you want to only output (1, 3), you can use
"reduceByKey" before "mapWithState" like this:

dstream.reduceByKey(_ + _).mapWithState(spec)

On Fri, Jan 15, 2016 at 1:21 AM, Terry Hoo  wrote:

> Hi,
> I am doing a simple test with mapWithState, and get some events
> unexpected, is this correct?
>
> The test is very simple: sum the value of each key
>
> val mappingFunction = (key: Int, value: Option[Int], state: State[Int]) => {
>   state.update(state.getOption().getOrElse(0) + value.getOrElse(0))
>   (key, state.get())
> }
> val spec = StateSpec.function(mappingFunction)
> dstream.mapWithState(spec)
>
> I create two RDDs and insert into dstream:
> RDD((1,1), (1,2), (2,1))
> RDD((1,3))
>
> Get result like this:
> RDD(*(1,1)*, *(1,3)*, (2,1))
> RDD((1,6))
>
> You can see that the first batch will generate two items with the same key
> "1": (1,1) and (1,3), is this expected behavior? I would expect (1,3) only.
>
> Regards
> - Terry
>


Re: 答复: 答复: 答复: spark streaming context trigger invoke stop why?

2016-01-15 Thread Shixiong(Ryan) Zhu
I see. So when your job fails, `jsc.awaitTermination();` will throw an
exception. Then you app main method will exit and trigger the shutdown hook
and call `jsc.stop()`.

On Thu, Jan 14, 2016 at 10:20 PM, Triones,Deng(vip.com) <
triones.d...@vipshop.com> wrote:

> Thanks for your response .
>
> Our code as below :
>
>
>
>
>
> public void process(){
>
> logger.info("streaming process start !!!");
>
>
>
> SparkConf sparkConf =
> createSparkConf(this.getClass().getSimpleName());
>
>
>
> JavaStreamingContext jsc =
> this.createJavaStreamingContext(sparkConf);
>
>
>
> if(this.streamingListener != null){
>
> jsc.addStreamingListener(this.streamingListener);
>
> }
>
> JavaPairDStream allKafkaWindowData =
> this.sparkReceiverDStream.createReceiverDStream(jsc,this.streamingConf.getWindowDuration(),
>
> this.streamingConf.getSlideDuration());
>
>
>
> this.businessProcess(allKafkaWindowData);
>
> this.sleep();
>
>jsc.start();
>
> jsc.awaitTermination();
>
>
>
>
>
> *发件人:* Shixiong(Ryan) Zhu [mailto:shixi...@databricks.com]
> *发送时间:* 2016年1月15日 6:02
> *收件人:* 邓刚[技术中心]
> *抄送:* Yogesh Mahajan; user
> *主题:* Re: 答复: 答复: spark streaming context trigger invoke stop why?
>
>
>
> Could you show your codes? Did you use
> `StreamingContext.awaitTermination`? If so, it will return if any exception
> happens.
>
>
>
> On Wed, Jan 13, 2016 at 11:47 PM, Triones,Deng(vip.com) <
> triones.d...@vipshop.com> wrote:
>
> What’s more, I am running a 7*24 hours job , so I won’t call System.exit()
> by myself. So I believe somewhere of the driver kill itself
>
>
>
> *发件人:* 邓刚[技术中心]
> *发送时间:* 2016年1月14日 15:45
> *收件人:* 'Yogesh Mahajan'
> *抄送:* user
> *主题:* 答复: 答复: spark streaming context trigger invoke stop why?
>
>
>
> Thanks for your response, ApplicationMaster is only for yarn mode. I am
> using standalone mode. Could you kindly please let me know where trigger
> the shutdown hook?
>
>
>
> *发件人:* Yogesh Mahajan [mailto:ymaha...@snappydata.io
> ]
> *发送时间:* 2016年1月14日 12:42
> *收件人:* 邓刚[技术中心]
> *抄送:* user
> *主题:* Re: 答复: spark streaming context trigger invoke stop why?
>
>
>
> All the action happens in ApplicationMaster expecially in run method
>
> Check ApplicationMaster#startUserApplication : userThread(Driver) which
> invokes ApplicationMaster#finish method. You can also try System.exit in
> your program
>
>
>
> Regards,
>
> Yogesh Mahajan,
>
> SnappyData Inc, snappydata.io
>
>
>
> On Thu, Jan 14, 2016 at 9:56 AM, Yogesh Mahajan 
> wrote:
>
> Hi Triones,
>
>
>
> Check the org.apache.spark.util.ShutdownHookManager : It adds this
> ShutDownHook when you start a StreamingContext
>
>
>
> Here is the code in StreamingContext.start()
>
>
>
> shutdownHookRef = ShutdownHookManager.addShutdownHook(
>
>   StreamingContext.SHUTDOWN_HOOK_PRIORITY)(stopOnShutdown)
>
>
>
> Also looke at the following def in StreamingContext which actually stops
> the context from shutdown hook :
>
> private def stopOnShutdown(): Unit = {
>
> val stopGracefully =
> conf.getBoolean("spark.streaming.stopGracefullyOnShutdown", false)
>
> logInfo(s"Invoking stop(stopGracefully=$stopGracefully) from shutdown
> hook")
>
> // Do not stop SparkContext, let its own shutdown hook stop it
>
> stop(stopSparkContext = false, stopGracefully = stopGracefully)
>
> }
>
>
>
> Regards,
>
> Yogesh Mahajan,
>
> SnappyData Inc, snappydata.io
>
>
>
> On Thu, Jan 14, 2016 at 8:55 AM, Triones,Deng(vip.com) <
> triones.d...@vipshop.com> wrote:
>
> More info
>
>
>
> I am using spark version 1.5.2
>
>
>
>
>
> *发件人:* Triones,Deng(vip.com) [mailto:triones.d...@vipshop.com]
> *发送时间:* 2016年1月14日 11:24
> *收件人:* user
> *主题:* spark streaming context trigger invoke stop why?
>
>
>
> Hi all
>
>  As I saw the driver log, the task failed 4 times in a stage, the
> stage will be dropped when the input block was deleted before make use of.
> After that the StreamingContext invoke stop.  Does anyone know what kind of
> akka message trigger the stop or which code trigger the shutdown hook?
>
>
>
>
>
> Thanks
>
>
>
>
>
>
>
>
>
> Driver log:
>
>
>
>  Job aborted due to stage failure: Task 410 in stage 215.0 failed 4 times
>
> [org.apache.spark.streaming.StreamingContext---Thread-0]: Invoking
> stop(stopGracefully=false) from shutdown hook
>
>
> 本电子邮件可能为保密文件。如果阁下非电子邮件所指定之收件人,谨请立即通知本人。敬请阁下不要使用、保存、复印、打印、散布本电子邮件及其内容,或将其用于其他任何目的或向任何人披露。谢谢您的合作!
> This communication is intended only for the addressee(s) and may contain
> information that is privileged and confidential. You are hereby notified
> that, if you are not an intended recipient listed above, or an authorized
> employee or agent of an addressee of this communication responsible for
> delivering e-mail messages to an intended recipient, any dissemination,
> distribution or reproduction of this communication (including any
> attachments hereto) is strictly 

Re: DataFrameWriter on partitionBy for parquet eat all RAM

2016-01-15 Thread Jerry Lam
Hi Michael,

Thanks for sharing the tip. It will help to the write path of the partitioned 
table. 
Do you have similar suggestion on reading the partitioned table back when there 
is a million of distinct values on the partition field (for example on user 
id)? Last time I have trouble to read a partitioned table because it takes very 
long (over hours on s3) to execute the 
sqlcontext.read.parquet("partitioned_table").

Best Regards,

Jerry

Sent from my iPhone

> On 15 Jan, 2016, at 3:59 pm, Michael Armbrust  wrote:
> 
> See here for some workarounds: 
> https://issues.apache.org/jira/browse/SPARK-12546
> 
>> On Thu, Jan 14, 2016 at 6:46 PM, Jerry Lam  wrote:
>> Hi Arkadiusz,
>> 
>> the partitionBy is not designed to have many distinct value the last time I 
>> used it. If you search in the mailing list, I think there are couple of 
>> people also face similar issues. For example, in my case, it won't work over 
>> a million distinct user ids. It will require a lot of memory and very long 
>> time to read the table back. 
>> 
>> Best Regards,
>> 
>> Jerry
>> 
>>> On Thu, Jan 14, 2016 at 2:31 PM, Arkadiusz Bicz  
>>> wrote:
>>> Hi
>>> 
>>> What is the proper configuration for saving parquet partition with
>>> large number of repeated keys?
>>> 
>>> On bellow code I load 500 milion rows of data and partition it on
>>> column with not so many different values.
>>> 
>>> Using spark-shell with 30g per executor and driver and 3 executor cores
>>> 
>>> sqlContext.read.load("hdfs://notpartitioneddata").write.partitionBy("columnname").parquet("partitioneddata")
>>> 
>>> 
>>> Job failed because not enough memory in executor :
>>> 
>>> WARN YarnSchedulerBackend$YarnSchedulerEndpoint: Container killed by
>>> YARN for exceeding memory limits. 43.5 GB of 43.5 GB physical memory
>>> used. Consider boosting spark.yarn.executor.memoryOverhead.
>>> 16/01/14 17:32:38 ERROR YarnScheduler: Lost executor 11 on
>>> datanode2.babar.poc: Container killed by YARN for exceeding memory
>>> limits. 43.5 GB of 43.5 GB physical memory used. Consider boosting
>>> spark.yarn.executor.memoryOverhead.
>>> 
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
> 


Re: How To Save TF-IDF Model In PySpark

2016-01-15 Thread Andy Davidson
Are you using 1.6.0 or an older version?

I think I remember something in 1.5.1 saying save was not implemented in
python.


The current doc does not say anything about save()
http://spark.apache.org/docs/latest/mllib-feature-extraction.html#tf-idf

http://spark.apache.org/docs/latest/ml-guide.html#saving-and-loading-pipelin
es
"Often times it is worth it to save a model or a pipeline to disk for later
use. In Spark 1.6, a model import/export functionality was added to the
Pipeline API. Most basic transformers are supported as well as some of the
more basic ML models. Please refer to the algorithm¹s API documentation to
see if saving and loading is supported."

andy




From:  Asim Jalis 
Date:  Friday, January 15, 2016 at 4:02 PM
To:  "user @spark" 
Subject:  How To Save TF-IDF Model In PySpark

> Hi,
> 
> I am trying to save a TF-IDF model in PySpark. Looks like this is not
> supported. 
> 
> Using `model.save()` causes:
> 
> AttributeError: 'IDFModel' object has no attribute 'save'
> 
> Using `pickle` causes:
> 
> TypeError: can't pickle lock objects
> 
> Does anyone have suggestions
> 
> Thanks!
> 
> Asim
> 
> Here is the full repro. Start pyspark shell and then run this code in
> it.
> 
> ```
> # Imports
> from pyspark import SparkContext
> from pyspark.mllib.feature import HashingTF
> 
> from pyspark.mllib.regression import LabeledPoint
> from pyspark.mllib.regression import Vectors
> from pyspark.mllib.feature import IDF
> 
> # Create some data
> n = 4
> freqs = [
> Vectors.sparse(n, (1, 3), (1.0, 2.0)),
> Vectors.dense([0.0, 1.0, 2.0, 3.0]),
> Vectors.sparse(n, [1], [1.0])]
> data = sc.parallelize(freqs)
> idf = IDF()
> model = idf.fit(data)
> tfidf = model.transform(data)
> 
> # View
> for r in tfidf.collect(): print(r)
> 
> # Try to save it
> model.save("foo.model")
> 
> # Try to save it with Pickle
> import pickle
> pickle.dump(model, open("model.p", "wb"))
> pickle.dumps(model)
> ```




Re: How To Save TF-IDF Model In PySpark

2016-01-15 Thread Jerry Lam
Can you save it to parquet with the vector in one field?

Sent from my iPhone

> On 15 Jan, 2016, at 7:33 pm, Andy Davidson  
> wrote:
> 
> Are you using 1.6.0 or an older version?
> 
> I think I remember something in 1.5.1 saying save was not implemented in 
> python.
> 
> 
> The current doc does not say anything about save()
> http://spark.apache.org/docs/latest/mllib-feature-extraction.html#tf-idf
> 
> http://spark.apache.org/docs/latest/ml-guide.html#saving-and-loading-pipelines
> "Often times it is worth it to save a model or a pipeline to disk for later 
> use. In Spark 1.6, a model import/export functionality was added to the 
> Pipeline API. Most basic transformers are supported as well as some of the 
> more basic ML models. Please refer to the algorithm’s API documentation to 
> see if saving and loading is supported."
> 
> andy
> 
> 
> 
> 
> From: Asim Jalis 
> Date: Friday, January 15, 2016 at 4:02 PM
> To: "user @spark" 
> Subject: How To Save TF-IDF Model In PySpark
> 
> Hi,
> 
> I am trying to save a TF-IDF model in PySpark. Looks like this is not
> supported. 
> 
> Using `model.save()` causes:
> 
> AttributeError: 'IDFModel' object has no attribute 'save'
> 
> Using `pickle` causes:
> 
> TypeError: can't pickle lock objects
> 
> Does anyone have suggestions 
> 
> Thanks!
> 
> Asim
> 
> Here is the full repro. Start pyspark shell and then run this code in
> it.
> 
> ```
> # Imports
> from pyspark import SparkContext
> from pyspark.mllib.feature import HashingTF
> 
> from pyspark.mllib.regression import LabeledPoint
> from pyspark.mllib.regression import Vectors
> from pyspark.mllib.feature import IDF
> 
> # Create some data
> n = 4
> freqs = [
> Vectors.sparse(n, (1, 3), (1.0, 2.0)), 
> Vectors.dense([0.0, 1.0, 2.0, 3.0]), 
> Vectors.sparse(n, [1], [1.0])]
> data = sc.parallelize(freqs)
> idf = IDF()
> model = idf.fit(data)
> tfidf = model.transform(data)
> 
> # View
> for r in tfidf.collect(): print(r)
> 
> # Try to save it
> model.save("foo.model")
> 
> # Try to save it with Pickle
> import pickle
> pickle.dump(model, open("model.p", "wb"))
> pickle.dumps(model)
> ```


Spark streaming: Fixed time aggregation & handling driver failures

2016-01-15 Thread ffarozan
I am implementing aggregation using spark streaming and kafka. My batch and
window size are same. And the aggregated data is persisted in Cassandra.

I want to aggregate for fixed time windows - 5:00, 5:05, 5:10, ...

But we cannot control when to run streaming job, we only get to specify the
batch interval. 

So the problem is - lets say if streaming job starts at 5:02, then I will
get results at 5:07, 5:12, etc. and not what I want.

Any suggestions?

thanks,
Firdousi



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-Fixed-time-aggregation-handling-driver-failures-tp25982.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Sending large objects to specific RDDs

2016-01-15 Thread Ted Yu
My knowledge of XSEDE is limited - I visited the website.

If there is no easy way to deploy HBase, alternative approach (using hdfs
?) needs to be considered.

I need to do more homework on this :-)

On Thu, Jan 14, 2016 at 3:51 PM, Daniel Imberman 
wrote:

> Hi Ted,
>
> So unfortunately after looking into the cluster manager that I will be
> using for my testing (I'm using a super-computer called XSEDE rather than
> AWS), it looks like the cluster does not actually come with Hbase installed
> (this cluster is becoming somewhat problematic, as it is essentially AWS
> but you have to do your own virtualization scripts). Do you have any other
> thoughts on how I could go about dealing with this purely using spark and
> HDFS?
>
> Thank you
>
> On Wed, Jan 13, 2016 at 11:49 AM Daniel Imberman <
> daniel.imber...@gmail.com> wrote:
>
>> Thank you Ted! That sounds like it would probably be the most efficient
>> (with the least overhead) way of handling this situation.
>>
>> On Wed, Jan 13, 2016 at 11:36 AM Ted Yu  wrote:
>>
>>> Another approach is to store the objects in NoSQL store such as HBase.
>>>
>>> Looking up object should be very fast.
>>>
>>> Cheers
>>>
>>> On Wed, Jan 13, 2016 at 11:29 AM, Daniel Imberman <
>>> daniel.imber...@gmail.com> wrote:
>>>
 I'm looking for a way to send structures to pre-determined partitions
 so that
 they can be used by another RDD in a mapPartition.

 Essentially I'm given and RDD of SparseVectors and an RDD of inverted
 indexes. The inverted index objects are quite large.

 My hope is to do a MapPartitions within the RDD of vectors where I can
 compare each vector to the inverted index. The issue is that I only
 NEED one
 inverted index object per partition (which would have the same key as
 the
 values within that partition).


 val vectors:RDD[(Int, SparseVector)]

 val invertedIndexes:RDD[(Int, InvIndex)] =
 a.reduceByKey(generateInvertedIndex)
 vectors:RDD.mapPartitions{
 iter =>
  val invIndex = invertedIndexes(samePartitionKey)
  iter.map(invIndex.calculateSimilarity(_))
  )
 }

 How could I go about setting up the Partition such that the specific
 data
 structure I need will be present for the mapPartition but I won't have
 the
 extra overhead of sending over all values (which would happen if I were
 to
 make a broadcast variable).

 One thought I have been having is to store the objects in HDFS but I'm
 not
 sure if that would be a suboptimal solution (It seems like it could slow
 down the process a lot)

 Another thought I am currently exploring is whether there is some way I
 can
 create a custom Partition or Partitioner that could hold the data
 structure
 (Although that might get too complicated and become problematic)

 Any thoughts on how I could attack this issue would be highly
 appreciated.

 thank you for your help!



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Sending-large-objects-to-specific-RDDs-tp25967.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


>>>


RE: Is it possible to use SparkSQL JDBC ThriftServer without Hive

2016-01-15 Thread Mohammed Guller
Sambit - I believe the default Derby-based metastore allows only one active 
user at a time. You can replace it with MySQL or Postgres. 

Using the Hive Metastore enables Spark SQL to be compatible with Hive. If you 
have an existing Hive setup, you can Spark SQL to process data in your Hive 
tables. 

Mohammed

-Original Message-
From: Sambit Tripathy (RBEI/EDS1) [mailto:sambit.tripa...@in.bosch.com] 
Sent: Friday, January 15, 2016 11:30 AM
To: Mohammed Guller; angela.whelan; user@spark.apache.org
Subject: RE: Is it possible to use SparkSQL JDBC ThriftServer without Hive

Hi Mohammed,

I think this is something you can do at the Thrift server startup. So this 
would run an instance of Derby and act as a Metastore. Any idea if this Debry 
Metastore will have distributed access and why do we use the Hive Metastore 
then?

@Angela: I would  also be happy to have a metastore owned by Spark Thrift 
Server. What are you trying to achieve by using the Thrift server without Hive?


Regards,
Sambit.


-Original Message-
From: Mohammed Guller [mailto:moham...@glassbeam.com]
Sent: Wednesday, January 13, 2016 2:54 PM
To: angela.whelan ; user@spark.apache.org
Subject: RE: Is it possible to use SparkSQL JDBC ThriftServer without Hive

Hi Angela,
Yes, you can use Spark SQL JDBC/ThriftServer without Hive.

Mohammed


-Original Message-
From: angela.whelan [mailto:angela.whe...@synchronoss.com]
Sent: Wednesday, January 13, 2016 3:37 AM
To: user@spark.apache.org
Subject: Is it possible to use SparkSQL JDBC ThriftServer without Hive

hi,
I'm wondering if it is possible to use the SparkSQL JDBC ThriftServer without 
Hive?

The reason I'm asking is that we are unsure about the speed of Hive with 
SparkSQL JDBC connectivity.

I can't find any article online about using SparkSQL JDBC ThriftServer without 
Hive.

Many thanks in advance for any help on this.

Thanks, Angela



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Is-it-possible-to-use-SparkSQL-JDBC-ThriftServer-without-Hive-tp25959.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Consuming commands from a queue

2016-01-15 Thread Afshartous, Nick

Hi,


We have a streaming job that consumes from Kafka and outputs to S3.  We're 
going to have the job also send commands (to copy from S3 to Redshift) into a 
different Kafka topic.


What would be the best framework for consuming and processing the copy commands 
?  We're considering creating a second streaming job or using Akka.


Thanks for any suggestions,

--

Nick


Re: Compiling only MLlib?

2016-01-15 Thread Ted Yu
Looks like you didn't have zinc running.

Take a look at install_zinc() in build/mvn, around line 83.
You can use build/mvn instead of running mvn directly.

I normally use the following command line:

bin/mvn clean -Phive -Phive-thriftserver -Pyarn -Phadoop-2.4
-Dhadoop.version=2.7.0 package -DskipTests

After one full build, you should be able to build MLlib module alone.

Cheers

On Fri, Jan 15, 2016 at 6:13 PM, Colin Woodbury  wrote:

> Hi, I'm very much interested in using Spark's MLlib in standalone
> programs. I've never used Hadoop, and don't intend to deploy on massive
> clusters. Building Spark has been an honest nightmare, and I've been on and
> off it for weeks.
>
> The build always runs out of RAM on my laptop (4g of RAM, Arch Linux) when
> I try to build with Scala 2.11 support. No matter how I tweak JVM flags to
> reduce maximum RAM use, the build always crashes.
>
> When trying to build Spark 1.6.0 for Scala 2.10 just now, the build had
> compilation errors. Here is one, as a sample. I've saved the rest:
>
> [error]
> /home/colin/building/apache-spark/spark-1.6.0/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkJLineReader.scala:16:
> object jline is not a member of package tools
> [error] import scala.tools.jline.console.completer._
>
> It informs me:
>
> [ERROR] After correcting the problems, you can resume the build with the
> command
> [ERROR]   mvn  -rf :spark-repl_2.10
>
> I don't feel safe doing that, given that I don't know what my ""
> are.
>
> I've noticed that the build is compiling a lot of things I have no
> interest in. Is it possible to just compile the Spark core, its tools, and
> MLlib? I just want to experiment, and this is causing me a  lot of stress.
>
> Thank you kindly,
> Colin
>


Re: Compiling only MLlib?

2016-01-15 Thread Matei Zaharia
Have you tried just downloading a pre-built package, or linking to Spark 
through Maven? You don't need to build it unless you are changing code inside 
it. Check out 
http://spark.apache.org/docs/latest/quick-start.html#self-contained-applications
 for how to link to it.

Matei

> On Jan 15, 2016, at 6:13 PM, Colin Woodbury  wrote:
> 
> Hi, I'm very much interested in using Spark's MLlib in standalone programs. 
> I've never used Hadoop, and don't intend to deploy on massive clusters. 
> Building Spark has been an honest nightmare, and I've been on and off it for 
> weeks.
> 
> The build always runs out of RAM on my laptop (4g of RAM, Arch Linux) when I 
> try to build with Scala 2.11 support. No matter how I tweak JVM flags to 
> reduce maximum RAM use, the build always crashes.
> 
> When trying to build Spark 1.6.0 for Scala 2.10 just now, the build had 
> compilation errors. Here is one, as a sample. I've saved the rest:
> 
> [error] 
> /home/colin/building/apache-spark/spark-1.6.0/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkJLineReader.scala:16:
>  object jline is not a member of package tools
> [error] import scala.tools.jline.console.completer._
> 
> It informs me:
> 
> [ERROR] After correcting the problems, you can resume the build with the 
> command
> [ERROR]   mvn  -rf :spark-repl_2.10
> 
> I don't feel safe doing that, given that I don't know what my "" are. 
> 
> I've noticed that the build is compiling a lot of things I have no interest 
> in. Is it possible to just compile the Spark core, its tools, and MLlib? I 
> just want to experiment, and this is causing me a  lot of stress.
> 
> Thank you kindly,
> Colin



Executor initialize before all resources are ready

2016-01-15 Thread Byron Wang
Hi, I am building metrics system for Spark Streaming job, in the system, the
metrics are collected in each executor, so a metrics source (a class used to
collect metrics) needs to be initialized in each executor. 
The metrics source is packaged in a jar, when submitting a job, the jar is
sent from local to each executor using the parameter '--jars', however, the
executor starts to initialize the metrics source class before the jar
arrives, as a result, it throws class not found exception.
It seems that if the executor could wait until all resources are ready, the
issue will be resolved, but I really do not know how to do it.

Is there anyone facing the same issue?

PS: I tried using HDFS (copy the jar to HDFS, then submit the job and let
the executor load class from a path in HDFS), but it fails. I checked the
source code, it seems that the class loader can only resolve local path.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Executor-initialize-before-all-resources-are-ready-tp25981.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Consuming commands from a queue

2016-01-15 Thread Cody Koeninger
Reading commands from kafka and triggering a redshift copy is sufficiently
simple it could just be a bash script.  But if you've already got a spark
streaming job set up, may as well use it for consistency's sake.  There's
definitely no need to mess around with akka.

On Fri, Jan 15, 2016 at 6:25 PM, Afshartous, Nick 
wrote:

>
> Hi,
>
>
> We have a streaming job that consumes from Kafka and outputs to S3.  We're
> going to have the job also send commands (to copy from S3 to Redshift) into
> a different Kafka topic.
>
>
> What would be the best framework for consuming and processing the copy
> commands ?  We're considering creating a second streaming job or using Akka.
>
>
> Thanks for any suggestions,
>
> --
>
> Nick
>


Re: Spark streaming: Fixed time aggregation & handling driver failures

2016-01-15 Thread Cody Koeninger
You can't really use spark batches as the basis for any kind of reliable
time aggregation.  Time of batch processing in general has nothing to do
with time of event.

You need to filter / aggregate by the time interval you care about, in your
own code, or use a data store that can do the aggregation.



On Fri, Jan 15, 2016 at 9:13 PM, ffarozan  wrote:

> I am implementing aggregation using spark streaming and kafka. My batch and
> window size are same. And the aggregated data is persisted in Cassandra.
>
> I want to aggregate for fixed time windows - 5:00, 5:05, 5:10, ...
>
> But we cannot control when to run streaming job, we only get to specify the
> batch interval.
>
> So the problem is - lets say if streaming job starts at 5:02, then I will
> get results at 5:07, 5:12, etc. and not what I want.
>
> Any suggestions?
>
> thanks,
> Firdousi
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-Fixed-time-aggregation-handling-driver-failures-tp25982.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Multi tenancy, REST and MLlib

2016-01-15 Thread Kevin Mellott
It sounds like you may be interested in a solution that implements the Lambda
Architecture , such as
Oryx2 . At a high level, this gives you the ability to
request and receive information immediately (serving layer), generating the
responses using a pre-built model (speed layer). Meanwhile, that model is
constantly being updated in the background as new information becomes
available (batch layer).

An example of a pre-built model in this scenario may be a predictive model
that want to predict the class of an incoming piece of data (i.e. does this
email look like SPAM or not).

On Fri, Jan 15, 2016 at 5:00 PM, feribg  wrote:

> I'm fairly new to Spark and Mllib, but i'm doing some research into multi
> tenancy of mllib based app. The idea is to provide ability to train models
> on demand with certain constraints (executor size) and then allow to serve
> predictions from those models via a REST layer.
>
> So far from my research I've gathered the following:
>
> 1) It's fairly easy to schedule training jobs and define the size of the
> executor of the job with something like spark job server or via cmd. I'd
> imagine you need separate contexts here anyways, because if theres one big
> context shared amongst different tenants, it wont allow training different
> models in parallel for the most part. So the solution here seems a context
> per tenant and training via Spark Job Server.
>
> 2) Second part seems a bit more tricky as it must expose the results of the
> trained models to the outside world via some form of API. So far I've been
> able to create a new context inside of a simple Spring REST application,
> load the persisted model and be able to call predict and return results.
>
> My main problem with this approach is that now I need to load the whole
> spark context for each single model instance and a single tenant can
> potentially have a bunch, which also means at least a JVM per tenant and
> this is quite wasteful. It seems the actual prediction part is fairly
> simple
> and I was wondering if there was a way to share multiple models to predict
> from on the same context. Would that allow parallel predictions (ie model B
> doesnt have to wait for a prediction of model A to complete in order to
> return).
>
> Given this simple scenario do you see a better approach to architect that,
> maybe I'm missing certain features of Spark that would facilitate it in a
> cleaner and more efficient manner.
>
> Thanks!
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Multi-tenancy-REST-and-MLlib-tp25979.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: SparkContext SyntaxError: invalid syntax

2016-01-15 Thread Bryan Cutler
Glad you got it going!  It's wasn't very obvious what needed to be set,
maybe it is worth explicitly stating this in the docs since it seems to
have come up a couple times before too.

Bryan

On Fri, Jan 15, 2016 at 12:33 PM, Andrew Weiner <
andrewweiner2...@u.northwestern.edu> wrote:

> Actually, I just found this [
> https://issues.apache.org/jira/browse/SPARK-1680], which after a bit of
> googling and reading leads me to believe that the preferred way to change
> the yarn environment is to edit the spark-defaults.conf file by adding this
> line:
> spark.yarn.appMasterEnv.PYSPARK_PYTHON/path/to/python
>
> While both this solution and the solution from my prior email work, I
> believe this is the preferred solution.
>
> Sorry for the flurry of emails.  Again, thanks for all the help!
>
> Andrew
>
> On Fri, Jan 15, 2016 at 1:47 PM, Andrew Weiner <
> andrewweiner2...@u.northwestern.edu> wrote:
>
>> I finally got the pi.py example to run in yarn cluster mode.  This was
>> the key insight:
>> https://issues.apache.org/jira/browse/SPARK-9229
>>
>> I had to set SPARK_YARN_USER_ENV in spark-env.sh:
>> export SPARK_YARN_USER_ENV="PYSPARK_PYTHON=/home/aqualab/local/bin/python"
>>
>> This caused the PYSPARK_PYTHON environment variable to be used in my yarn
>> environment in cluster mode.
>>
>> Thank you for all your help!
>>
>> Best,
>> Andrew
>>
>>
>>
>> On Fri, Jan 15, 2016 at 12:57 PM, Andrew Weiner <
>> andrewweiner2...@u.northwestern.edu> wrote:
>>
>>> I tried playing around with my environment variables, and here is an
>>> update.
>>>
>>> When I run in cluster mode, my environment variables do not persist
>>> throughout the entire job.
>>> For example, I tried creating a local copy of HADOOP_CONF_DIR in
>>> /home//local/etc/hadoop/conf, and then, in spark-env.sh I the
>>> variable:
>>> export HADOOP_CONF_DIR=/home//local/etc/hadoop/conf
>>>
>>> Later, when we print the environment variables in the python code, I see
>>> this:
>>>
>>> ('HADOOP_CONF_DIR', '/etc/hadoop/conf')
>>>
>>> However, when I run in client mode, I see this:
>>>
>>> ('HADOOP_CONF_DIR', '/home/awp066/local/etc/hadoop/conf')
>>>
>>> Furthermore, if I omit that environment variable from spark-env.sh 
>>> altogether, I get the expected error in both client and cluster mode:
>>>
>>> When running with master 'yarn'
>>> either HADOOP_CONF_DIR or YARN_CONF_DIR must be set in the environment.
>>>
>>> This suggests that my environment variables are being used when I first 
>>> submit the job, but at some point during the job, my environment variables 
>>> are thrown out and someone's (yarn's?) environment variables are being used.
>>>
>>> Andrew
>>>
>>>
>>> On Fri, Jan 15, 2016 at 11:03 AM, Andrew Weiner <
>>> andrewweiner2...@u.northwestern.edu> wrote:
>>>
 Indeed!  Here is the output when I run in cluster mode:

 Traceback (most recent call last):
   File "pi.py", line 22, in ?
 raise RuntimeError("\n"+str(sys.version_info) +"\n"+
 RuntimeError:
 (2, 4, 3, 'final', 0)
 [('PYSPARK_GATEWAY_PORT', '48079'), ('PYTHONPATH', 
 '/scratch2/hadoop/yarn/local/usercache//filecache/116/spark-assembly-1.6.0-hadoop2.4.0.jar:/home//spark-1.6.0-bin-hadoop2.4/python:/home//code/libs:/scratch5/hadoop/yarn/local/usercache//appcache/application_1450370639491_0239/container_1450370639491_0239_01_01/pyspark.zip:/scratch5/hadoop/yarn/local/usercache//appcache/application_1450370639491_0239/container_1450370639491_0239_01_01/py4j-0.9-src.zip'),
  ('PYTHONUNBUFFERED', 'YES')]

 As we suspected, it is using Python 2.4

 One thing that surprises me is that PYSPARK_PYTHON is not showing up in 
 the list, even though I am setting it and exporting it in spark-submit 
 *and* in spark-env.sh.  Is there somewhere else I need to set this 
 variable?  Maybe in one of the hadoop conf files in my HADOOP_CONF_DIR?

 Andrew



 On Thu, Jan 14, 2016 at 1:14 PM, Bryan Cutler 
 wrote:

> It seems like it could be the case that some other Python version is
> being invoked.  To make sure, can you add something like this to the top 
> of
> the .py file you are submitting to get some more info about how the
> application master is configured?
>
> import sys, os
> raise RuntimeError("\n"+str(sys.version_info) +"\n"+
> str([(k,os.environ[k]) for k in os.environ if "PY" in k]))
>
> On Thu, Jan 14, 2016 at 8:37 AM, Andrew Weiner <
> andrewweiner2...@u.northwestern.edu> wrote:
>
>> Hi Bryan,
>>
>> I ran "$> python --version" on every node on the cluster, and it is
>> Python 2.7.8 for every single one.
>>
>> When I try to submit the Python example in client mode
>> * ./bin/spark-submit  --master yarn --deploy-mode client
>> --driver-memory 4g --executor-memory 2g --executor-cores 1
>> ./examples/src/main/python/pi.py 10*
>> That's when I get this 

Multi tenancy, REST and MLlib

2016-01-15 Thread feribg
I'm fairly new to Spark and Mllib, but i'm doing some research into multi
tenancy of mllib based app. The idea is to provide ability to train models
on demand with certain constraints (executor size) and then allow to serve
predictions from those models via a REST layer.

So far from my research I've gathered the following:

1) It's fairly easy to schedule training jobs and define the size of the
executor of the job with something like spark job server or via cmd. I'd
imagine you need separate contexts here anyways, because if theres one big
context shared amongst different tenants, it wont allow training different
models in parallel for the most part. So the solution here seems a context
per tenant and training via Spark Job Server.

2) Second part seems a bit more tricky as it must expose the results of the
trained models to the outside world via some form of API. So far I've been
able to create a new context inside of a simple Spring REST application,
load the persisted model and be able to call predict and return results.

My main problem with this approach is that now I need to load the whole
spark context for each single model instance and a single tenant can
potentially have a bunch, which also means at least a JVM per tenant and
this is quite wasteful. It seems the actual prediction part is fairly simple
and I was wondering if there was a way to share multiple models to predict
from on the same context. Would that allow parallel predictions (ie model B
doesnt have to wait for a prediction of model A to complete in order to
return).

Given this simple scenario do you see a better approach to architect that,
maybe I'm missing certain features of Spark that would facilitate it in a
cleaner and more efficient manner.

Thanks!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Multi-tenancy-REST-and-MLlib-tp25979.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



How To Save TF-IDF Model In PySpark

2016-01-15 Thread Asim Jalis
Hi,

I am trying to save a TF-IDF model in PySpark. Looks like this is not
supported.

Using `model.save()` causes:

AttributeError: 'IDFModel' object has no attribute 'save'

Using `pickle` causes:

TypeError: can't pickle lock objects

Does anyone have suggestions

Thanks!

Asim

Here is the full repro. Start pyspark shell and then run this code in
it.

```
# Imports
from pyspark import SparkContext
from pyspark.mllib.feature import HashingTF

from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.regression import Vectors
from pyspark.mllib.feature import IDF

# Create some data
n = 4
freqs = [
Vectors.sparse(n, (1, 3), (1.0, 2.0)),
Vectors.dense([0.0, 1.0, 2.0, 3.0]),
Vectors.sparse(n, [1], [1.0])]
data = sc.parallelize(freqs)
idf = IDF()
model = idf.fit(data)
tfidf = model.transform(data)

# View
for r in tfidf.collect(): print(r)

# Try to save it
model.save("foo.model")

# Try to save it with Pickle
import pickle
pickle.dump(model, open("model.p", "wb"))
pickle.dumps(model)
```


Compiling only MLlib?

2016-01-15 Thread Colin Woodbury
Hi, I'm very much interested in using Spark's MLlib in standalone programs.
I've never used Hadoop, and don't intend to deploy on massive clusters.
Building Spark has been an honest nightmare, and I've been on and off it
for weeks.

The build always runs out of RAM on my laptop (4g of RAM, Arch Linux) when
I try to build with Scala 2.11 support. No matter how I tweak JVM flags to
reduce maximum RAM use, the build always crashes.

When trying to build Spark 1.6.0 for Scala 2.10 just now, the build had
compilation errors. Here is one, as a sample. I've saved the rest:

[error]
/home/colin/building/apache-spark/spark-1.6.0/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkJLineReader.scala:16:
object jline is not a member of package tools
[error] import scala.tools.jline.console.completer._

It informs me:

[ERROR] After correcting the problems, you can resume the build with the
command
[ERROR]   mvn  -rf :spark-repl_2.10

I don't feel safe doing that, given that I don't know what my ""
are.

I've noticed that the build is compiling a lot of things I have no interest
in. Is it possible to just compile the Spark core, its tools, and MLlib? I
just want to experiment, and this is causing me a  lot of stress.

Thank you kindly,
Colin


spark.master overwritten in standalone plus cluster deploy-mode

2016-01-15 Thread shanson
Issue: 'spark.master' not updated in driver launch command when mastership
changes.

Settings: --deploy-mode cluster \
 --supervise \
 --master "spark://master1:6066,master2:6066,master3:6066" \
 --conf "spark.master=spark://master1:7077,master2:7077,master3:7077"
... java opts and program args


This works fine most of the time, except in the following scenario:
1. master1 is told to launch driver and picks workerA to do so.
2. master1 leaves the cluster and workerA recognizes master2 as the new
master
3. driver dies for some reason
4. workerA tries to relaunch driver with
"-Dspark.master=spark://master1:7077"
5. driver gets connection refused repeatedly and never finds the new master.

The issue seems to stem from the rest server unconditionally overwriting
spark.master witht the current master url and nothing ever refreshing it:
https://github.com/apache/spark/blob/v1.6.0/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala#L147

A hacky workaround that I found was to use "spark.driver.extraJavaOption" to
pass in "-Dspark.master=spark://master1:7077,master2:7077,master3:7077".
This causes the master to be properly found, although it gets a little
confusing on the command line to see spark.master specified so many times.

Does anyone know if this is purposeful or a bug?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/spark-master-overwritten-in-standalone-plus-cluster-deploy-mode-tp25980.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Executor initialize before all resources are ready

2016-01-15 Thread Ted Yu
Which Spark release are you using ?

Thanks

On Fri, Jan 15, 2016 at 7:08 PM, Byron Wang  wrote:

> Hi, I am building metrics system for Spark Streaming job, in the system,
> the
> metrics are collected in each executor, so a metrics source (a class used
> to
> collect metrics) needs to be initialized in each executor.
> The metrics source is packaged in a jar, when submitting a job, the jar is
> sent from local to each executor using the parameter '--jars', however, the
> executor starts to initialize the metrics source class before the jar
> arrives, as a result, it throws class not found exception.
> It seems that if the executor could wait until all resources are ready, the
> issue will be resolved, but I really do not know how to do it.
>
> Is there anyone facing the same issue?
>
> PS: I tried using HDFS (copy the jar to HDFS, then submit the job and let
> the executor load class from a path in HDFS), but it fails. I checked the
> source code, it seems that the class loader can only resolve local path.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Executor-initialize-before-all-resources-are-ready-tp25981.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


AIC in Linear Regression in ml pipeline

2016-01-15 Thread Arunkumar Pillai
Hi

Is it possible to get AIC value in Linear Regression using ml pipeline ? Is
so please help me

-- 
Thanks and Regards
Arun


[Spark 1.6][Streaming] About the behavior of mapWithState

2016-01-15 Thread Terry Hoo
Hi,
I am doing a simple test with mapWithState, and get some events unexpected,
is this correct?

The test is very simple: sum the value of each key

val mappingFunction = (key: Int, value: Option[Int], state: State[Int]) => {
  state.update(state.getOption().getOrElse(0) + value.getOrElse(0))
  (key, state.get())
}
val spec = StateSpec.function(mappingFunction)
dstream.mapWithState(spec)

I create two RDDs and insert into dstream:
RDD((1,1), (1,2), (2,1))
RDD((1,3))

Get result like this:
RDD(*(1,1)*, *(1,3)*, (2,1))
RDD((1,6))

You can see that the first batch will generate two items with the same key
"1": (1,1) and (1,3), is this expected behavior? I would expect (1,3) only.

Regards
- Terry


RE: NPE when using Joda DateTime

2016-01-15 Thread Spencer, Alex (Santander)
Hi,

I tried Zhu’s recommendation and sadly got the same error. (Again, single map 
worked by the groupBy / flatMap generates this error).

Does Kryo has a bug i.e. it’s not serialising all components needed, or do  I 
just need to get our IT team to install those magro Serializers as suggested by 
Todd? If that variable is transient then actually that means Kryo is working as 
it’s meant to?

Am I at the point where I should pull apart the source code and build my own 
DateTime class? I hate reinventing the wheel though.

Thanks,
Alex.

From: Shixiong(Ryan) Zhu [mailto:shixi...@databricks.com]
Sent: 14 January 2016 21:57
To: Durgesh Verma
Cc: Spencer, Alex (Santander); Todd Nist; Sean Owen; user@spark.apache.org
Subject: Re: NPE when using Joda DateTime

Could you try to use "Kryo.setDefaultSerializer" like this:

class YourKryoRegistrator extends KryoRegistrator {

  override def registerClasses(kryo: Kryo) {

kryo.setDefaultSerializer(classOf[com.esotericsoftware.kryo.serializers.JavaSerializer])
  }
}


On Thu, Jan 14, 2016 at 12:54 PM, Durgesh Verma 
> wrote:
Today is my day... Trying to go thru where I can pitch in. Let me know if below 
makes sense.

I looked at joda Java Api source code (1.2.9) and traced that call in NPE. It 
looks like AssembledChronology class is being used, the iYears instance 
variable is defined as transient.

DateTime.minusYears(int years) call trace:
long instant = getChronology().years().subtract(getMillis(), years);

Not sure how the suggested serializer would help if variable is transient.

Thanks,
-Durgesh

On Jan 14, 2016, at 11:49 AM, Spencer, Alex (Santander) 
>
 wrote:
I appreciate this – thank you.

I’m not an admin on the box I’m using spark-shell on – so I’m not sure I can 
add them to that namespace. I’m hoping if I declare the JodaDateTimeSerializer 
class in my REPL that I can still get this to work. I think the INTERVAL part 
below may be key, I haven’t tried that yet.

Kind Regards,
Alex.

From: Todd Nist [mailto:tsind...@gmail.com]
Sent: 14 January 2016 16:28
To: Spencer, Alex (Santander)
Cc: Sean Owen; user@spark.apache.org
Subject: Re: NPE when using Joda DateTime

I had a similar problem a while back and leveraged these Kryo serializers, 
https://github.com/magro/kryo-serializers.  I had to fallback to version 0.28, 
but that was a while back.  You can add these to the
org.apache.spark.serializer.KryoRegistrator
and then set your registrator in the spark config:
sparkConfig.
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.set("spark.kryo.registrator", "com.yourpackage.YourKryoRegistrator")
...

where YourKryoRegistrator is something like:
class YourKryoRegistrator extends KryoRegistrator {
  override def registerClasses(kryo: Kryo) {
kryo.register(classOf[org.joda.time.DateTime], new JodaDateTimeSerializer)
kryo.register(classOf[org.joda.time.Interval], new JodaIntervalSerializer)
  }
}
HTH.
-Todd

On Thu, Jan 14, 2016 at 9:28 AM, Spencer, Alex (Santander) 
>
 wrote:
Hi,

I tried take(1500) and test.collect and these both work on the "single" map 
statement.

I'm very new to Kryo serialisation, I managed to find some code and I copied 
and pasted and that's what originally made the single map statement work:

class MyRegistrator extends KryoRegistrator {
  override def registerClasses(kryo: Kryo) {
kryo.register(classOf[org.joda.time.DateTime])
  }
}

Is it because the groupBy sees a different class type? Maybe Array[DateTime]? I 
don’t want to find the answer by trial and error though.

Alex

-Original Message-
From: Sean Owen [mailto:so...@cloudera.com]
Sent: 14 January 2016 14:07
To: Spencer, Alex (Santander)
Cc: user@spark.apache.org
Subject: Re: NPE when using Joda DateTime

It does look somehow like the state of the DateTime object isn't being 
recreated properly on deserialization somehow, given where the NPE occurs (look 
at the Joda source code). However the object is java.io.Serializable. Are you 
sure the Kryo serialization is correct?

It doesn't quite explain why the map operation works by itself. It could be the 
difference between executing locally (take(1) will look at 1 partition in 1 
task which prefers to be local) and executing remotely (groupBy is going to 
need a shuffle).

On Thu, Jan 14, 2016 at 1:01 PM, Spencer, Alex (Santander) 
>
 wrote:
> Hello,
>
>
>
> I was wondering if somebody is able to help me get to the bottom of a
> null pointer exception I’m seeing in my code. I’ve managed to narrow
> down a problem in a larger class to my use of Joda’s DateTime
> functions. I’ve successfully run my