Announcing the Community Over Code 2024 Streaming Track

2024-03-20 Thread James Hughes
Hi all,

Community Over Code , the ASF conference,
will be held in Denver, Colorado,

October 7-10, 2024. The call for presentations

is open now through April 15, 2024.  (This is two months earlier than last
year!)

I am one of the co-chairs for the stream processing track, and we would
love to see you there and hope that you will consider submitting a talk.

About the Streaming track:

There are many top-level ASF projects which focus on and push the envelope
for stream and event processing.  ActiveMQ, Beam, Bookkeeper, Camel, Flink,
Kafka, Pulsar, RocketMQ, and Spark are all house-hold names in the stream
processing and analytics world at this point.  These projects show that
stream processing has unique characteristics requiring deep expertise.  On
the other hand, users need easy to apply solutions.  The streaming track
will host talks focused on the use cases and advances of these projects as
well as other developments in the streaming world.

Thanks and see you in October!

Jim


Re: Spark Connect, Master, and Workers

2023-09-01 Thread James Yu
Can I simply understand Spark Connect this way:  The client process is now the 
Spark driver?

From: Brian Huynh 
Sent: Thursday, August 10, 2023 10:15 PM
To: Kezhi Xiong 
Cc: user@spark.apache.org 
Subject: Re: Spark Connect, Master, and Workers

Hi Kezhi,

Yes, you no longer need to start a master to make the client work. Please see 
the quickstart.

https://spark.apache.org/docs/latest/api/python/getting_started/quickstart_connect.html

You can think of Spark Connect as an API on top of Master so workers can be 
added to the cluster same as before.

https://blog.insightdatascience.com/simply-install-spark-cluster-mode-341843a52b88

Regards,
Brian

On Wed, Aug 9, 2023 at 3:20 PM Kezhi Xiong  wrote:
Hi,

I'm recently learning Spark Connect but have some questions regarding the 
connect server's relation with master or workers: so when I'm using the connect 
server, I don't have to start a master alone side to make clients work. Is the 
connect server simply using "local[*]" as master? Then, if I want to add 
workers for my connect server, is it supported and what should I do?

Kezhi


--
From Brian H.


[k8s] Fail to expose custom port on executor container specified in my executor pod template

2023-06-26 Thread James Yu
Hi Team,


I have no luck in trying to expose port 5005 (for remote debugging purpose) on 
my executor container using the following pod template and spark configuration

s3a://mybucket/pod-template-executor-debug.yaml

apiVersion: v1
kind: Pod
spec:
  containers:
  - name: spark-kubernetes-executor
ports:
- containerPort: 5005
  name: debug
  protocol: TCP

--config 
spark.kubernetes.executor.podTemplateFile=s3a://mybucket/pod-template-executor-debug.yaml


The resultant executor container only exposes the default 7079/TCP port, but 
not the 5005/TCP that I wanted it to expose.

It works just fine for the driver container with the similar settings where I 
can see all ports are exposed (5005/TCP, 7078/TCP, 7079/TCP, 4040/TCP) as 
expected.

Did I miss anything, or is this a known bug where executor pod template is not 
respected in terms of the port expose?

Thanks in advance for your help.

James


Announcing the Community Over Code 2023 Streaming Track

2023-06-09 Thread James Hughes
Hi all,

Community Over Code , the ASF conference,
will be held in Halifax, Nova Scotia October 7-10, 2023. The call for
presentations  is
open now through July 13, 2023.

I am one of the co-chairs for the stream processing track, and we would
love to see you there and hope that you will consider submitting a talk.

About the Streaming track:

There are many top-level ASF projects which focus on and push the envelope
for stream and event processing.  ActiveMQ, Beam, Bookkeeper, Camel, Flink,
Kafka, Pulsar, RocketMQ, and Spark are all house-hold names in the stream
processing and analytics world at this point.  These projects show that
stream processing has unique characteristics requiring deep expertise.  On
the other hand, users need easy to apply solutions.  The streaming track
will host talks focused on the use cases and advances of these projects as
well as other developments in the streaming world.

Thanks and see you in October!

Jim


Re: query time comparison to several SQL engines

2022-04-07 Thread James Turton
What might be the biggest factor affecting running time here is that 
Drill's query execution is not fault tolerant while Spark's is.  The 
philosophy is different, Drill's says "when you're doing interactive 
analytics and a node dies, killing your query as it goes, just run the 
query again."


On 2022/04/07 16:11, Wes Peng wrote:


Hi Jacek,

Spark and Drill have no direct relations. But they have the similar 
architecture.


If you read the book "Learning Apache Drill" (I guess it's free 
online), chap 3 will give you Drill's SQL engine architecture:



It's quite similar to Spark's.

And the distributed implementation architecture is almost the same as 
Spark:



Though they are separated products, but have the similar 
implementation IMO.


No, I didn't use a statement optimized for Drill. It's just a common 
SQL statement.


The reason for drill is faster, I think it's b/c drill's direct mmap 
technology. It's more memory consumed than spark, so more faster.


Thanks.


Jacek Laskowski wrote:
Is this true that Drill is Spark or vice versa under the hood? If so, 
how is it possible that Drill is faster? What does Drill do to make 
the query faster? Could this be that you used a type of query Drill 
is optimized for? Just guessing and am really curious (not implying 
that one is better or worse than the other(s)).



-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Log4j 1.2.17 spark CVE

2021-12-13 Thread James Yu
Question: Spark use log4j 1.2.17, if my application jar contains log4j 2.x and 
gets submitted to the Spark cluster.  Which version of log4j gets actually used 
during the Spark session?

From: Sean Owen 
Sent: Monday, December 13, 2021 8:25 AM
To: Jörn Franke 
Cc: Pralabh Kumar ; dev ; 
user.spark 
Subject: Re: Log4j 1.2.17 spark CVE

This has come up several times over years - search JIRA. The very short summary 
is: Spark does not use log4j 1.x, but its dependencies do, and that's the issue.
Anyone that can successfully complete the surgery at this point is welcome to, 
but I failed ~2 years ago.

On Mon, Dec 13, 2021 at 10:02 AM Jörn Franke 
mailto:jornfra...@gmail.com>> wrote:
Is it in any case appropriate to use log4j 1.x which is not maintained anymore 
and has other security vulnerabilities which won’t be fixed anymore ?

Am 13.12.2021 um 06:06 schrieb Sean Owen 
mailto:sro...@gmail.com>>:


Check the CVE - the log4j vulnerability appears to affect log4j 2, not 1.x. 
There was mention that it could affect 1.x when used with JNDI or SMS handlers, 
but Spark does neither. (unless anyone can think of something I'm missing, but 
never heard or seen that come up at all in 7 years in Spark)

The big issue would be applications that themselves configure log4j 2.x, but 
that's not a Spark issue per se.

On Sun, Dec 12, 2021 at 10:46 PM Pralabh Kumar 
mailto:pralabhku...@gmail.com>> wrote:
Hi developers,  users

Spark is built using log4j 1.2.17 . Is there a plan to upgrade based on recent 
CVE detected ?


Regards
Pralabh kumar


Re: start-history-server.sh doesn't survive system reboot. Recommendation?

2021-12-08 Thread James Yu
The Ops guy would probably be fired if he doesn't make sure the container 
runtime is up 24/7.  

From: Mich Talebzadeh 
Sent: Wednesday, December 8, 2021 12:33 PM
Cc: user @spark 
Subject: Re: start-history-server.sh doesn't survive system reboot. 
Recommendation?

Well that is just relying on docker daemon to start after reboot. It may not

docker ps
Cannot connect to the Docker daemon at unix:///var/run/docker.sock. Is the 
docker daemon running?
systemctl start docker
docker ps
CONTAINER IDIMAGE   COMMAND CREATED 
STATUS  PORTS   NAMES

So your mileage varies. A proper way would be to have two instances of 
start-history-server.sh running on different hosts listening to the same Spark, 
thus avoiding a single point of failure.

HTH



 
[https://docs.google.com/uc?export=download=1-q7RFGRfLMObPuQPWSd9sl_H1UPNFaIZ=0B1BiUVX33unjMWtVUWpINWFCd0ZQTlhTRHpGckh4Wlg4RG80PQ]
   view my Linkedin 
profile<https://urldefense.com/v3/__https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/__;!!DvJyiiI1!bR1iEMzC4cyfUQYQRuFn4GzLtS2Qq2vn7gux-j3NU7HJmDM9jWzQgrP4D2HaDw$>



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
damage or destruction of data or any other property which may arise from 
relying on this email's technical content is explicitly disclaimed. The author 
will in no case be liable for any monetary damages arising from such loss, 
damage or destruction.




On Wed, 8 Dec 2021 at 19:45, James Yu mailto:ja...@ispot.tv>> 
wrote:
Just thought about another possibility which is to containerize the history 
server and run the container with proper restart policy.  This may be the 
approach we will be taking because the deployment of such HS would be more 
flexible.

Thanks!

From: Sean Owen mailto:sro...@gmail.com>>
Sent: Tuesday, December 7, 2021 1:29 PM
To: James Yu mailto:ja...@ispot.tv>>
Cc: user @spark mailto:user@spark.apache.org>>
Subject: Re: start-history-server.sh doesn't survive system reboot. 
Recommendation?

The scripts just launch the processes. To make any process restart on system 
restart, you would need to set it up as a system service (i.e. controlled by 
'service'). That's easy enough but out of scope for Spark itself.

On Tue, Dec 7, 2021 at 3:23 PM James Yu mailto:ja...@ispot.tv>> 
wrote:
Hi Users,

We found that the history server launched by using the 
"start-history-server.sh" command does not survive system reboot.  Any 
recommendation of making it always up even after reboot?

Thanks,

James


Re: start-history-server.sh doesn't survive system reboot. Recommendation?

2021-12-08 Thread James Yu
Just thought about another possibility which is to containerize the history 
server and run the container with proper restart policy.  This may be the 
approach we will be taking because the deployment of such HS would be more 
flexible.

Thanks!

From: Sean Owen 
Sent: Tuesday, December 7, 2021 1:29 PM
To: James Yu 
Cc: user @spark 
Subject: Re: start-history-server.sh doesn't survive system reboot. 
Recommendation?

The scripts just launch the processes. To make any process restart on system 
restart, you would need to set it up as a system service (i.e. controlled by 
'service'). That's easy enough but out of scope for Spark itself.

On Tue, Dec 7, 2021 at 3:23 PM James Yu mailto:ja...@ispot.tv>> 
wrote:
Hi Users,

We found that the history server launched by using the 
"start-history-server.sh" command does not survive system reboot.  Any 
recommendation of making it always up even after reboot?

Thanks,

James


start-history-server.sh doesn't survive system reboot. Recommendation?

2021-12-07 Thread James Yu
Hi Users,

We found that the history server launched by using the 
"start-history-server.sh" command does not survive system reboot.  Any 
recommendation of making it always up even after reboot?

Thanks,

James


Re: Performance Problems Migrating to S3A Committers

2021-08-05 Thread James Yu
See this ticket https://issues.apache.org/jira/browse/HADOOP-17201.  It may 
help your team.

From: Johnny Burns 
Sent: Tuesday, June 22, 2021 3:41 PM
To: user@spark.apache.org 
Cc: data-orchestration-team 
Subject: Performance Problems Migrating to S3A Committers

Hello.

I’m Johnny, I work at Stripe. We’re heavy Spark users and we’ve been exploring 
using s3 committers. Currently we first write the data to HDFS and then upload 
it to S3. However, now with S3 offering strong consistency guarantees, we are 
evaluating if we can write data directly to S3.

We’re having some troubles with performance, so hoping someone might have some 
guidance which can unblock this.

File Format
We are using parquet as the File Format. We do have iceberg tables as well, and 
they are indeed able to commit directly to S3 (with minimal local disk usage). 
We can’t migrate all of our jobs to iceberg right now. Hence, we are looking 
for a committer that is performant and can directly write parquet files to S3 
(with minimal local disk usage).
What have we tried?
We’ve tried using both the “magic” and “directory” committers. We're setting 
the following configs (in addition to the "magic/directory" 
committer.name).


"spark.hadoop.fs.s3a.committer.magic.enabled":"true",


"spark.hadoop.mapreduce.outputcommitter.factory.scheme.s3a":"org.apache.hadoop.fs.s3a.commit.S3ACommitterFactory",


"spark.sql.sources.commitProtocolClass":"org.apache.spark.internal.io.cloud.PathOutputCommitProtocol",


"spark.sql.parquet.output.committer.class":"org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter",

Both committers have shown performance regressions on large jobs. We’re 
currently focused on trying to make the directory committer work because we’ve 
seen fewer slowdowns with that one, but I’ll describe the problems with each.

We’ve been testing the committers on a large job with 100k tasks (creating 
7.3TB of output).
Observations for magic committer

Using the magic committer, we see slowdowns in two places:


  *   S3 Writing (inside the task)

  *   The slowdown seems to occur just after the s3 multipart write. The 
finishedWrite
 function tries to do some cleanup and kicks off this 
deleteUnnecessaryFakeDirectories
 
function.


  *   This causes 503’s due to hitting AWS rate limits on 
com.amazonaws.services.s3.model.DeleteObjectsRequest

  *   I'm not sure what directories are actually getting cleaned up here (I 
assume the _magic directories are still needed up until the job commit).


  *   Job Commit

  *   Have not dug down into the details here, but assume it is something 
similar to what we’re seeing in the directory committer case below.

Observations for directory committer

We’ve observed that the “directory” s3committer performance is on-par with our 
existing HDFS commit for task execution and task commit. The slowdowns we’re 
seeing are in the job commit phase.

The job commit happens almost instantaneously in the HDFS case, vs taking about 
an hour for the s3 directory committer.

We’ve enabled DEBUG logging for the s3 committer. It seems like that hour is 
mostly spent doing things which you would expect (completing 100k 
delayedComplete s3 uploads). I've attached an example of some of the logs we 
see repeated over-and-over during the 1 hour job commit (I redacted some of the 
directories and SHAs but the logs are otherwise unchanged).

One thing I notice is that we see object_delete_requests += 1 in the logs. I’m 
not sure if that means it’s doing an s3 delete, or it is deleting the HDFS 
manifest files (to clean up the task).

Alternatives - Should we check out directCommitter?
We’ve also considered using the directCommitter. We understand that the 
directCommitter is discouraged because it does not support speculative 
execution (and for some failure cases). Given that we do not use speculative 
execution at Stripe, would the directCommitter be a viable option for us? What 
are the failure scenarios to consider?

Alternatives - Can S3FileIO work well with parquet files?

Netflix has a tool called s3FileIO. 
We’re wondering if it can be used with spark, or only with Iceburg.


Re: Poor performance caused by coalesce to 1

2021-02-03 Thread James Yu
Hi Silvio,

The result file is less than 50 MB in size so I think it is small and 
acceptable enough for one task to write.

Your suggestion sounds interesting. Could you guide us further on how to easily 
"add a stage boundary"?

Thanks

From: Silvio Fiorito 
Sent: Wednesday, February 3, 2021 11:05 AM
To: James Yu ; user 
Subject: Re: Poor performance caused by coalesce to 1


Coalesce is reducing the parallelization of your last stage, in your case to 1 
task. So, it’s natural it will give poor performance especially with large 
data. If you absolutely need a single file output, you can instead add a stage 
boundary and use repartition(1). This will give your query full parallelization 
during processing while at the end giving you a single task that writes data 
out. Note that if the file is large (e.g. in 1GB or more) you’ll probably still 
notice slowness while writing. You may want to reconsider the 1-file 
requirement for larger datasets.



From: James Yu 
Date: Wednesday, February 3, 2021 at 1:54 PM
To: user 
Subject: Poor performance caused by coalesce to 1



Hi Team,



We are running into this poor performance issue and seeking your suggestion on 
how to improve it:



We have a particular dataset which we aggregate from other datasets and like to 
write out to one single file (because it is small enough).  We found that after 
a series of transformations (GROUP BYs, FLATMAPs), we coalesced the final RDD 
to 1 partition before writing it out, and this coalesce degrade the 
performance, not that this additional coalesce operation took additional 
runtime, but it somehow dictates the partitions to use in the upstream 
transformations.



We hope there is a simple and useful way to solve this kind of issue which we 
believe is quite common for many people.





Thanks



James


Poor performance caused by coalesce to 1

2021-02-03 Thread James Yu
Hi Team,

We are running into this poor performance issue and seeking your suggestion on 
how to improve it:

We have a particular dataset which we aggregate from other datasets and like to 
write out to one single file (because it is small enough).  We found that after 
a series of transformations (GROUP BYs, FLATMAPs), we coalesced the final RDD 
to 1 partition before writing it out, and this coalesce degrade the 
performance, not that this additional coalesce operation took additional 
runtime, but it somehow dictates the partitions to use in the upstream 
transformations.

We hope there is a simple and useful way to solve this kind of issue which we 
believe is quite common for many people.


Thanks

James


Re: Where do the executors get my app jar from?

2020-08-14 Thread James Yu
Henoc,

Ok. That is for Yarn with HDFS. What will happen in Kubernetes as resource 
manager without HDFS scenario?

James


From: Henoc 
Sent: Thursday, August 13, 2020 10:45 PM
To: James Yu 
Cc: user ; russell.spit...@gmail.com 

Subject: Re: Where do the executors get my app jar from?

If you are running Spark on Yarn, the spark-submit utility will download the 
jar from S3 and copy it to HDFS in a distributed cache. The driver shares this 
location with Yarn NodeManagers via the container LaunchContext. NodeManagers 
localize the jar and place it on container classpath before they launch the 
executor container

Henoc

On Fri, Aug 14, 2020, 6:19 AM Russell Spitzer 
mailto:russell.spit...@gmail.com>> wrote:
Looking back at the code

All --jar Args and such run through

https://github.com/apache/spark/blob/7f275ee5978e00ac514e25f5ef1d4e3331f8031b/core/src/main/scala/org/apache/spark/SparkContext.scala#L493-L500

Which calls

https://github.com/apache/spark/blob/7f275ee5978e00ac514e25f5ef1d4e3331f8031b/core/src/main/scala/org/apache/spark/SparkContext.scala#L1842

Which places local jars on the driver hosted file server and just leaves Remote 
Jars as is with the path for executors to access them

On Thu, Aug 13, 2020 at 11:01 PM Russell Spitzer 
mailto:russell.spit...@gmail.com>> wrote:
The driver hosts a file server which the executors download the jar from.

On Thu, Aug 13, 2020, 5:33 PM James Yu mailto:ja...@ispot.tv>> 
wrote:
Hi,

When I spark submit a Spark app with my app jar located in S3, obviously the 
Driver will download the jar from the s3 location.  What is not clear to me is: 
where do the Executors get the jar from?  From the same s3 location, or somehow 
from the Driver, or they don't need the jar?

Thanks in advance for explanation.

James


Where do the executors get my app jar from?

2020-08-13 Thread James Yu
Hi,

When I spark submit a Spark app with my app jar located in S3, obviously the 
Driver will download the jar from the s3 location.  What is not clear to me is: 
where do the Executors get the jar from?  From the same s3 location, or somehow 
from the Driver, or they don't need the jar?

Thanks in advance for explanation.

James


Re: [Spark ML] existence of Matrix Factorization ALS algorithm's log version

2020-07-29 Thread James Yuan
Thanks for your quick reply. I'll hack it if needed :)

James



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: NoClassDefFoundError: scala/Product$class

2020-06-06 Thread James Moore
How are you depending on that org.bdgenomics.adam library?  Maybe you're
pulling the 2.11 version of that.


Trump and modi butcher of Gujarat as Allies. Modi was banned to enter by US courts

2020-04-29 Thread James Mitchel


What is a VPN ? freedom from natzi owen censorship

2020-04-29 Thread James Mitchel


https://www.lausanne.org/content/lga/2019-05/the-rise-of-hindu-fundamentalism?gclid=Cj0KCQjwy6T1BRDXARIsAIqCTXpmVG-8QJwiOSTVH8fkhRXj3QXUufApRXbPJUTpLlZ4f4wWgFNlPVkaAndGEALw_wcB

2020-04-29 Thread James Mitchel

https://globalnews.ca/news/6823170/canadian-politicians-targeted-indian-intelligence/

Natzi Owen of Apache.org and two hindutwa against me.Characters who stole last 
remaining dignity from Apache tribe.Allies蘿

Abusing me
A price. Worth paying.
I will chose different technology to put bread on the table.




Re: Spark driver thread

2020-03-06 Thread James Yu
Pol, thanks for your reply.

Actually I am running Spark apps in CLUSTER mode. Is what you said still 
applicable in cluster mode.  Thanks in advance for your further clarification.


From: Pol Santamaria 
Sent: Friday, March 6, 2020 12:59 AM
To: James Yu 
Cc: user@spark.apache.org 
Subject: Re: Spark driver thread

Hi james,

You can configure the Spark Driver to use more than a single thread. It is 
something that depends on the application, but the Spark driver can take 
advantage of multiple threads in many situations. For instance, when the driver 
program gathers or sends data to the workers.

So yes, if you do computation or I/O on the driver side, you should explore 
using multithreads and more than 1 vCPU.

Bests,
Pol Santamaria

On Fri, Mar 6, 2020 at 1:28 AM James Yu mailto:ja...@ispot.tv>> 
wrote:
Hi,

Does a Spark driver always works as single threaded?

If yes, does it mean asking for more than one vCPU for the driver is wasteful?


Thanks,
James


Spark driver thread

2020-03-05 Thread James Yu
Hi,

Does a Spark driver always works as single threaded?

If yes, does it mean asking for more than one vCPU for the driver is wasteful?


Thanks,
James


[Spark SQL] dependencies to use test helpers

2019-07-24 Thread James Pirz
I have a Scala application in which I have added some extra rules to
Catalyst.
While adding some unit tests, I am trying to use some existing functions
from Catalyst's test code: Specifically comparePlans() and normalizePlan()
under PlanTestBase
<https://github.com/apache/spark/blob/fced6696a7713a5dc117860faef43db6b81d07b3/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala>
[1].

I am just wondering which additional dependencies I need to add to my
project to access them. Currently, I have below dependencies but they do
not cover above APIs.

libraryDependencies += "org.apache.spark" % "spark-core_2.11" % "2.4.3"
libraryDependencies += "org.apache.spark" % "spark-sql_2.11" % "2.4.3"
libraryDependencies += "org.apache.spark" % "spark-catalyst_2.11" % "2.4.3"


[1] 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala

Thanks,
James


Re: Announcing Delta Lake 0.2.0

2019-06-20 Thread James Cotrotsios
Is there a plan to have a business catalog component for the Data Lake? If
not how would someone make a proposal to create an open source project
related to that. I would be interested in building out an open source data
catalog that would use the Hive metadata store as a baseline for technical
metadata.


On Wed, Jun 19, 2019 at 3:04 PM Liwen Sun  wrote:

> We are delighted to announce the availability of Delta Lake 0.2.0!
>
> To try out Delta Lake 0.2.0, please follow the Delta Lake Quickstart:
> https://docs.delta.io/0.2.0/quick-start.html
>
> To view the release notes:
> https://github.com/delta-io/delta/releases/tag/v0.2.0
>
> This release introduces two main features:
>
> *Cloud storage support*
> In addition to HDFS, you can now configure Delta Lake to read and write
> data on cloud storage services such as Amazon S3 and Azure Blob Storage.
> For configuration instructions, please see:
> https://docs.delta.io/0.2.0/delta-storage.html
>
> *Improved concurrency*
> Delta Lake now allows concurrent append-only writes while still ensuring
> serializability. For concurrency control in Delta Lake, please see:
> https://docs.delta.io/0.2.0/delta-concurrency.html
>
> We have also greatly expanded the test coverage as part of this release.
>
> We would like to acknowledge all community members for contributing to
> this release.
>
> Best regards,
> Liwen Sun
>
>


Parallel read parquet file, write to postgresql

2018-12-03 Thread James Starks
Reading Spark doc 
(https://spark.apache.org/docs/latest/sql-data-sources-parquet.html). It's not 
mentioned how to parallel read parquet file with SparkSession. Would 
--num-executors just work? Any additional parameters needed to be added to 
SparkSession as well?

Also if I want to parallel write data to database, would options 
'numPartitions' and 'batchsize' enough to improve write performance? For 
example,

 mydf.format("jdbc").
 option("driver", "org.postgresql.Driver").
 option("url", url).
 option("dbtable", table_name).
 option("user", username).
 option("password", password).
 option("numPartitions", N) .
 option("batchsize", M)
 save

From Spark website 
(https://spark.apache.org/docs/2.2.0/sql-programming-guide.html#jdbc-to-other-databases),
 I only find these two parameters that would have impact  on db write 
performance.

I appreciate any suggestions.

Re: Convert RDD[Iterrable[MyCaseClass]] to RDD[MyCaseClass]

2018-12-03 Thread James Starks
By taking with your advice flatMap, now I can convert result from 
RDD[Iterable[MyCaseClass]] to RDD[MyCaseClass]. Basically just to perform 
flatMap in the end before starting to convert RDD object back to DF (i.e. 
SparkSession.createDataFrame(rddRecordsOfMyCaseClass)). For instance,

df.map { ... }.filter{ ... }.flatMap { records => records.flatMap { record => 
Seq(record) } }

Not smart code, but it works for my case.

Thanks for the advice!

‐‐‐ Original Message ‐‐‐
On Saturday, December 1, 2018 12:17 PM, Chris Teoh  wrote:

> Hi James,
>
> Try flatMap (_.toList). See below example:-
>
> scala> case class MyClass(i:Int)
> defined class MyClass
>
> scala> val r = 1 to 100
> r: scala.collection.immutable.Range.Inclusive = Range(1, 2, 3, 4, 5, 6, 7, 8, 
> 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 
> 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 
> 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 
> 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 
> 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100)
>
> scala> val r2 = 101 to 200
> r2: scala.collection.immutable.Range.Inclusive = Range(101, 102, 103, 104, 
> 105, 106, 107, 108, 109, 110, 111, 112, 113, 114, 115, 116, 117, 118, 119, 
> 120, 121, 122, 123, 124, 125, 126, 127, 128, 129, 130, 131, 132, 133, 134, 
> 135, 136, 137, 138, 139, 140, 141, 142, 143, 144, 145, 146, 147, 148, 149, 
> 150, 151, 152, 153, 154, 155, 156, 157, 158, 159, 160, 161, 162, 163, 164, 
> 165, 166, 167, 168, 169, 170, 171, 172, 173, 174, 175, 176, 177, 178, 179, 
> 180, 181, 182, 183, 184, 185, 186, 187, 188, 189, 190, 191, 192, 193, 194, 
> 195, 196, 197, 198, 199, 200)
>
> scala> val c1 = r.map(MyClass(_)).toIterable
> c1: Iterable[MyClass] = Vector(MyClass(1), MyClass(2), MyClass(3), 
> MyClass(4), MyClass(5), MyClass(6), MyClass(7), MyClass(8), MyClass(9), 
> MyClass(10), MyClass(11), MyClass(12), MyClass(13), MyClass(14), MyClass(15), 
> MyClass(16), MyClass(17), MyClass(18), MyClass(19), MyClass(20), MyClass(21), 
> MyClass(22), MyClass(23), MyClass(24), MyClass(25), MyClass(26), MyClass(27), 
> MyClass(28), MyClass(29), MyClass(30), MyClass(31), MyClass(32), MyClass(33), 
> MyClass(34), MyClass(35), MyClass(36), MyClass(37), MyClass(38), MyClass(39), 
> MyClass(40), MyClass(41), MyClass(42), MyClass(43), MyClass(44), MyClass(45), 
> MyClass(46), MyClass(47), MyClass(48), MyClass(49), MyClass(50), MyClass(51), 
> MyClass(52), MyClass(53), MyClass(54), MyClass(55), MyClass(56), MyClass(57), 
> MyClass(58), MyClass(59), MyClass(...
>
> scala> val c2 = r2.map(MyClass(_)).toIterable
> c2: Iterable[MyClass] = Vector(MyClass(101), MyClass(102), MyClass(103), 
> MyClass(104), MyClass(105), MyClass(106), MyClass(107), MyClass(108), 
> MyClass(109), MyClass(110), MyClass(111), MyClass(112), MyClass(113), 
> MyClass(114), MyClass(115), MyClass(116), MyClass(117), MyClass(118), 
> MyClass(119), MyClass(120), MyClass(121), MyClass(122), MyClass(123), 
> MyClass(124), MyClass(125), MyClass(126), MyClass(127), MyClass(128), 
> MyClass(129), MyClass(130), MyClass(131), MyClass(132), MyClass(133), 
> MyClass(134), MyClass(135), MyClass(136), MyClass(137), MyClass(138), 
> MyClass(139), MyClass(140), MyClass(141), MyClass(142), MyClass(143), 
> MyClass(144), MyClass(145), MyClass(146), MyClass(147), MyClass(148), 
> MyClass(149), MyClass(150), MyClass(151), MyClass(152), MyClass(153), 
> MyClass(154), MyClass(15...
> scala> val rddIt = sc.parallelize(Seq(c1,c2))
> rddIt: org.apache.spark.rdd.RDD[Iterable[MyClass]] = ParallelCollectionRDD[2] 
> at parallelize at :28
>
> scala> rddIt.flatMap(_.toList)
> res4: org.apache.spark.rdd.RDD[MyClass] = MapPartitionsRDD[3] at flatMap at 
> :26
>
> res4 is what you're looking for.
>
> On Sat, 1 Dec 2018 at 21:09, Chris Teoh  wrote:
>
>> Do you have the full code example?
>>
>> I think this would be similar to the mapPartitions code flow, something like 
>> flatMap( _ =>  _.toList )
>>
>> I haven't yet tested this out but this is how I'd first try.
>>
>> On Sat, 1 Dec 2018 at 01:02, James Starks  
>> wrote:
>>
>>> When processing data, I create an instance of RDD[Iterable[MyCaseClass]] 
>>> and I want to convert it to RDD[MyCaseClass] so that it can be further 
>>> converted to dataset or dataframe with toDS() function. But I encounter a 
>>> problem that SparkContext can not be instantiated within SparkSession.map 
>>> function because it already exists, even with allowMultipleContexts set to 
>>> true.
>>>
>>> val sc = new SparkConf()
>>> sc.set("spark.driver.allowMultipleContexts", "true")
>>> new SparkContext(sc).parallelize(seq)
>>>
>>> How can I fix this?
>>>
>>> Thanks.
>>
>> --
>> Chris
>
> --
> Chris

Re: Caused by: java.io.NotSerializableException: com.softwaremill.sttp.FollowRedirectsBackend

2018-11-30 Thread James Starks
Shadowed with

object MyObject {
  def mymethod(param: MyParam) = actual_function(param)
}
class MyObject {
  import MyObject._
  session.map { ... =>
 mymethod(...)
  }
}

does the job.

Thanks for the advice!

‐‐‐ Original Message ‐‐‐
On Friday, November 30, 2018 9:26 AM,  wrote:

> If it’s just a couple of classes and they are actually suitable for 
> serializing and you have the source code then you can shadow them in your own 
> project with the serializable interface added. Your shadowed classes should 
> be on the classpath before the library’s versions which should lead to spark 
> being able to use the serializable versions.
>
> That’s very much a last resort though!
>
> Chris
>
> On 30 Nov 2018, at 05:08, Koert Kuipers  wrote:
>
>> if you only use it in the executors sometimes using lazy works
>>
>> On Thu, Nov 29, 2018 at 9:45 AM James Starks 
>>  wrote:
>>
>>> This is not problem directly caused by Spark, but it's related; thus asking 
>>> here. I use spark to read data from parquet and processing some http call 
>>> with sttp (https://github.com/softwaremill/sttp). However, spark throws
>>>
>>> Caused by: java.io.NotSerializableException: 
>>> com.softwaremill.sttp.FollowRedirectsBackend
>>>
>>> It's understood why such exception is thrown because
>>> FollowRedirectsBackend is not seralizable. So I would like know in such 
>>> case -  are there any ways to get around this problem without modifying, 
>>> recompiling original code?
>>>
>>> Thanks

Convert RDD[Iterrable[MyCaseClass]] to RDD[MyCaseClass]

2018-11-30 Thread James Starks
When processing data, I create an instance of RDD[Iterable[MyCaseClass]] and I 
want to convert it to RDD[MyCaseClass] so that it can be further converted to 
dataset or dataframe with toDS() function. But I encounter a problem that 
SparkContext can not be instantiated within SparkSession.map function because 
it already exists, even with allowMultipleContexts set to true.

val sc = new SparkConf()
sc.set("spark.driver.allowMultipleContexts", "true")
new SparkContext(sc).parallelize(seq)

How can I fix this?

Thanks.

Caused by: java.io.NotSerializableException: com.softwaremill.sttp.FollowRedirectsBackend

2018-11-29 Thread James Starks
This is not problem directly caused by Spark, but it's related; thus asking 
here. I use spark to read data from parquet and processing some http call with 
sttp (https://github.com/softwaremill/sttp). However, spark throws

Caused by: java.io.NotSerializableException: 
com.softwaremill.sttp.FollowRedirectsBackend
It's understood why such exception is thrown because

FollowRedirectsBackend is not seralizable. So I would like know in such case -  
are there any ways to get around this problem without modifying, recompiling 
original code?

Thanks

Re: Spark job's driver programe consums too much memory

2018-09-07 Thread James Starks
Yes I think I am confused because originally my thought was that executor only 
requires 10g then driver ideally do not need to consume more than 10g or at 
least not more than 20g. But this is not the case. My configuration is setting 
--dervier-memory to 25g and --executor-memory 10g. And my program basically 
only uses `filter`, `map`, `write.mode().parquet` as below (main logic)

val df = spark.read.format("jdbc")...option("dbtable", "select * from 
mytable where filedX <> ''")...load() /* sql returns around 8MM records. */
df.createOrReplaceTempView("newtable")
val newdf = spark.sql("select field1, ..., filedN from newtable" /* around 
50 fields */).as[MyCaseClass].filter {...}.map { ... }.filter { ... }
 newdf.wrie.mode(...).parquet(...)

So I don't understand why driver program need such huge memory? And I don't 
find related doc explaining this, either spark website or through google 
(perhaps I miss it by using wrong keyword). Any places that  may contain 
pointer to this?

I appreciate your help.


‐‐‐ Original Message ‐‐‐
On 7 September 2018 4:46 PM, Apostolos N. Papadopoulos  
wrote:

> You are putting all together and this does not make sense. Writing data
> to HDFS does not require that all data should be transfered back to the
> driver and THEN saved to HDFS.
>
> This would be a disaster and it would never scale. I suggest to check
> the documentation more carefully because I believe you are a bit confused.
>
> regards,
>
> Apostolos
>
> On 07/09/2018 05:39 μμ, James Starks wrote:
>
> > Is df.write.mode(...).parquet("hdfs://..") also actions function? Checking 
> > doc shows that my spark doesn't use those actions functions. But save 
> > functions looks resembling the function 
> > df.write.mode(overwrite).parquet("hdfs://path/to/parquet-file") used by my 
> > spark job uses. Therefore I am thinking maybe that's the reason why my 
> > spark job driver consumes such amount of memory.
> > https://spark.apache.org/docs/2.2.0/rdd-programming-guide.html#actions
> > My spark job's driver program consumes too much memory, so I want to 
> > prevent that by writing data to hdfs at the executor side, instead of 
> > waiting those data to be sent back to the driver program (then writing to 
> > hdfs). This is because our worker servers have bigger memory size than the 
> > one that runs driver program. If I can write data to hdfs at executor, then 
> > the driver memory for my spark job can be reduced.
> > Otherwise does Spark support streaming read from database (i.e. spark 
> > streaming + spark sql)?
> > Thanks for your reply.
> > ‐‐‐ Original Message ‐‐‐
> > On 7 September 2018 4:15 PM, Apostolos N. Papadopoulos papad...@csd.auth.gr 
> > wrote:
> >
> > > Dear James,
> > >
> > > -   check the Spark documentation to see the actions that return a lot of
> > > data back to the driver. One of these actions is collect(). However,
> > > take(x) is an action, also reduce() is an action.
> > > Before executing collect() find out what is the size of your RDD/DF.
> > >
> > > -   I cannot understand the phrase "hdfs directly from the executor". You
> > > can specify an hdfs file as your input and also you can use hdfs to
> > > store your output.
> > > regards,
> > > Apostolos
> > > On 07/09/2018 05:04 μμ, James Starks wrote:
> > >
> > >
> > > > I have a Spark job that read data from database. By increasing submit
> > > > parameter '--driver-memory 25g' the job can works without a problem
> > > > locally but not in prod env because prod master do not have enough
> > > > capacity.
> > > > So I have a few questions:
> > > > -  What functions such as collecct() would cause the data to be sent
> > > > back to the driver program?
> > > >   My job so far merely uses `as`, `filter`, `map`, and `filter`.
> > > >
> > > > -   Is it possible to write data (in parquet format for instance) to
> > > > hdfs directly from the executor? If so how can I do (any code 
> > > > snippet,
> > > > doc for reference, or what keyword to search cause can't find by 
> > > > e.g.
> > > > `spark direct executor hdfs write`)?
> > > >
> > > >
> > > > Thanks
> > > > --
> > >
> > > Apostolos N. Papadopoulos, Associate Professor
> > > Department of Informatics
> > > Aristotle University of Thessaloniki
> > > Thessaloniki, GREECE
> > > tel: ++0030312310991918
> > > email: papad...@csd.auth.gr
> > > twitter: @papadopoulos_ap
> > > web: http://datalab.csd.auth.gr/~apostol
> > >
> > > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
> --
>
> Apostolos N. Papadopoulos, Associate Professor
> Department of Informatics
> Aristotle University of Thessaloniki
> Thessaloniki, GREECE
> tel: ++0030312310991918
> email: papad...@csd.auth.gr
> twitter: @papadopoulos_ap
> web: http://datalab.csd.auth.gr/~apostol



-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark job's driver programe consums too much memory

2018-09-07 Thread James Starks


Is df.write.mode(...).parquet("hdfs://..") also actions function? Checking doc 
shows that my spark doesn't use those actions functions. But save functions 
looks resembling the function 
df.write.mode(overwrite).parquet("hdfs://path/to/parquet-file") used by my 
spark job uses. Therefore I am thinking maybe that's the reason why my spark 
job driver consumes such amount of memory.

https://spark.apache.org/docs/2.2.0/rdd-programming-guide.html#actions

My spark job's driver program consumes too much memory, so I want to prevent 
that by writing data to hdfs at the executor side, instead of waiting those 
data to be sent back to the driver program (then writing to hdfs). This is 
because our worker servers have bigger memory size than the one that runs 
driver program. If I can write data to hdfs at executor, then the driver memory 
for my spark job can be reduced.

Otherwise does Spark support streaming read from database (i.e. spark streaming 
+ spark sql)?

Thanks for your reply.



‐‐‐ Original Message ‐‐‐
On 7 September 2018 4:15 PM, Apostolos N. Papadopoulos  
wrote:

> Dear James,
>
> -   check the Spark documentation to see the actions that return a lot of
> data back to the driver. One of these actions is collect(). However,
> take(x) is an action, also reduce() is an action.
>
> Before executing collect() find out what is the size of your RDD/DF.
>
> -   I cannot understand the phrase "hdfs directly from the executor". You
> can specify an hdfs file as your input and also you can use hdfs to
> store your output.
>
> regards,
>
> Apostolos
>
> On 07/09/2018 05:04 μμ, James Starks wrote:
>
>
> > I have a Spark job that read data from database. By increasing submit
> > parameter '--driver-memory 25g' the job can works without a problem
> > locally but not in prod env because prod master do not have enough
> > capacity.
> > So I have a few questions:
> > -  What functions such as collecct() would cause the data to be sent
> > back to the driver program?
> >   My job so far merely uses `as`, `filter`, `map`, and `filter`.
> >
> > -   Is it possible to write data (in parquet format for instance) to
> > hdfs directly from the executor? If so how can I do (any code snippet,
> > doc for reference, or what keyword to search cause can't find by e.g.
> > `spark direct executor hdfs write`)?
> >
> >
> > Thanks
>
> --
>
> Apostolos N. Papadopoulos, Associate Professor
> Department of Informatics
> Aristotle University of Thessaloniki
> Thessaloniki, GREECE
> tel: ++0030312310991918
> email: papad...@csd.auth.gr
> twitter: @papadopoulos_ap
> web: http://datalab.csd.auth.gr/~apostol
>
>
> ---
>
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org



-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Spark job's driver programe consums too much memory

2018-09-07 Thread James Starks
I have a Spark job that read data from database. By increasing submit parameter 
'--driver-memory 25g' the job can works without a problem locally but not in 
prod env because prod master do not have enough capacity.

So I have a few questions:

-  What functions such as collecct() would cause the data to be sent back to 
the driver program?
  My job so far merely uses `as`, `filter`, `map`, and `filter`.

- Is it possible to write data (in parquet format for instance) to hdfs 
directly from the executor? If so how can I do (any code snippet, doc for 
reference, or what keyword to search cause can't find by e.g. `spark direct 
executor hdfs write`)?

Thanks

Re: [External Sender] How to debug Spark job

2018-09-07 Thread James Starks
Got the root cause eventually as it throws java.lang.OutOfMemoryError: Java 
heap space. Increasing --driver-memory temporarily fixes the problem. Thanks.

‐‐‐ Original Message ‐‐‐
On 7 September 2018 12:32 PM, Femi Anthony  
wrote:

> One way I would go about this would be to try running a new_df.show(numcols, 
> truncate=False) on a few columns before you try writing to parquet to force 
> computation of newdf and see whether the hanging is occurring at that point 
> or during the write. You may also try doing a newdf.count() as well.
>
> Femi
>
> On Fri, Sep 7, 2018 at 5:48 AM James Starks  
> wrote:
>
>> I have a Spark job that reads from a postgresql (v9.5) table, and write 
>> result to parquet. The code flow is not complicated, basically
>>
>> case class MyCaseClass(field1: String, field2: String)
>> val df = spark.read.format("jdbc")...load()
>> df.createOrReplaceTempView(...)
>> val newdf = spark.sql("seslect field1, field2 from 
>> mytable").as[MyCaseClass].map { row =>
>>   val fieldX = ... // extract something from field2
>>   (field1, fileldX)
>> }.filter { ... /* filter out field 3 that's not valid */ }
>> newdf.write.mode(...).parquet(destPath)
>>
>> This job worked correct without a problem. But it's doesn't look working ok 
>> (the job looks like hanged) when adding more fields. The refactored job 
>> looks as below
>> ...
>> val newdf = spark.sql("seslect field1, field2, ... fieldN from 
>> mytable").as[MyCaseClassWithMoreFields].map { row =>
>> ...
>> NewCaseClassWithMoreFields(...) // all fields plus fieldX
>> }.filter { ... }
>> newdf.write.mode(...).parquet(destPath)
>>
>> Basically what the job does is extracting some info from one of a field in 
>> db table, appends that newly extracted field to the original row, and then 
>> dumps the whole new table to parquet.
>>
>> new filed + (original field1 + ... + original fieldN)
>> ...
>> ...
>>
>> Records loaded by spark sql to spark job (before refactored) are around 8MM, 
>> this remains the same, but when the refactored spark runs, it looks hanging 
>> there without progress. The only output on the console is (there is no 
>> crash, no exceptions thrown)
>>
>> WARN  HeartbeatReceiver:66 - Removing executor driver with no recent 
>> heartbeats: 137128 ms exceeds timeout 12 ms
>>
>> Memory in top command looks like
>>
>> VIRT RES SHR%CPU %MEM
>> 15.866g 8.001g  41.4m 740.3   25.6
>>
>> The command used to  submit spark job is
>>
>> spark-submit --class ... --master local[*] --driver-memory 10g 
>> --executor-memory 10g ... --files ... --driver-class-path ...  ...
>>
>> How can I debug or check which part of my code might cause the problem (so I 
>> can improve it)?
>>
>> Thanks
>
> ---
>
> The information contained in this e-mail is confidential and/or proprietary 
> to Capital One and/or its affiliates and may only be used solely in 
> performance of work or services for Capital One. The information transmitted 
> herewith is intended only for use by the individual or entity to which it is 
> addressed. If the reader of this message is not the intended recipient, you 
> are hereby notified that any review, retransmission, dissemination, 
> distribution, copying or other use of, or taking of any action in reliance 
> upon this information is strictly prohibited. If you have received this 
> communication in error, please contact the sender and delete the material 
> from your computer.

How to debug Spark job

2018-09-07 Thread James Starks
I have a Spark job that reads from a postgresql (v9.5) table, and write result 
to parquet. The code flow is not complicated, basically

case class MyCaseClass(field1: String, field2: String)
val df = spark.read.format("jdbc")...load()
df.createOrReplaceTempView(...)
val newdf = spark.sql("seslect field1, field2 from 
mytable").as[MyCaseClass].map { row =>
  val fieldX = ... // extract something from field2
  (field1, fileldX)
}.filter { ... /* filter out field 3 that's not valid */ }
newdf.write.mode(...).parquet(destPath)

This job worked correct without a problem. But it's doesn't look working ok 
(the job looks like hanged) when adding more fields. The refactored job looks 
as below
...
val newdf = spark.sql("seslect field1, field2, ... fieldN from 
mytable").as[MyCaseClassWithMoreFields].map { row =>
...
NewCaseClassWithMoreFields(...) // all fields plus fieldX
}.filter { ... }
newdf.write.mode(...).parquet(destPath)

Basically what the job does is extracting some info from one of a field in db 
table, appends that newly extracted field to the original row, and then dumps 
the whole new table to parquet.

new filed + (original field1 + ... + original fieldN)
...
...

Records loaded by spark sql to spark job (before refactored) are around 8MM, 
this remains the same, but when the refactored spark runs, it looks hanging 
there without progress. The only output on the console is (there is no crash, 
no exceptions thrown)

WARN  HeartbeatReceiver:66 - Removing executor driver with no recent 
heartbeats: 137128 ms exceeds timeout 12 ms

Memory in top command looks like

VIRT RES SHR%CPU %MEM
15.866g 8.001g  41.4m 740.3   25.6

The command used to  submit spark job is

spark-submit --class ... --master local[*] --driver-memory 10g 
--executor-memory 10g ... --files ... --driver-class-path ...  ...

How can I debug or check which part of my code might cause the problem (so I 
can improve it)?

Thanks

Re: Pass config file through spark-submit

2018-08-17 Thread James Starks
Accidentally to get it working, though don't thoroughly understand why (So far 
as I know, it's to configure in allowing executor refers to the conf file after 
copying to executors' working dir). Basically it's a combination of parameters 
--conf, --files, and --driver-class-path, instead of any single parameter.

spark-submit --class pkg.to.MyApp --master local[*] --conf 
"spark.executor.extraClassPath=-Dconfig.file=" --files 
 --driver-class-path ""

--conf requires to pass the conf file name e.g. myfile.conf along with spark 
executor class path as directive.

--files passes the conf file associated from the context root e.g. executing 
under dir , under which it contains folders such as conf, logs, 
work and so on. The conf file i.e. myfile.conf is located under conf folder.

--driver-class-path points to the conf directory with absolute path.


‐‐‐ Original Message ‐‐‐
On August 17, 2018 3:00 AM, yujhe.li  wrote:

> So can you read the file on executor side?
> I think the file passed by --files my.app.conf would be added under
> classpath, and you can use it directly.
>
>
> 
>
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> 
>
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org



-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Pass config file through spark-submit

2018-08-16 Thread James Starks
I have a config file that exploits type safe config library located on the 
local file system, and want to submit that file through spark-submit so that 
spark program can read customized parameters. For instance,

my.app {
  db {
host = domain.cc
port = 1234
db = dbname
user = myuser
passwd = mypass
  }
}

Spark submit code looks like

spark-submit --class "my.app.Sample" --master local[*] --conf 
"spark.executor.extraJavaOptions=-Dconfig.file=/path/to/conf/myapp.conf" 
/path/to/my-app.jar

But the program can not read the parameters such as db, user, host, and so on 
in my conf file.

Passing with --files /path/to/myapp.conf doesn't work either.

What is the correct way to submit that kind of conf file so that my spark job 
can read customized parameters from there?

Thanks

Data source jdbc does not support streamed reading

2018-08-08 Thread James Starks
Now my spark job can perform sql operations against database table. Next I want 
to combine  that with streaming context, so switching to readStream() function. 
But after job submission, spark throws

Exception in thread "main" java.lang.UnsupportedOperationException: Data 
source jdbc does not support streamed reading

That looks like sparkSession.readSteam.format("jdbc")... jdbc doesn't support 
streaming

val sparkSession = SparkSession.builder().appName("my-test").getOrCreate()
import session.implicits._
val df = sparkSession.readStream.format("jdbc")...load()
// other operations against df

Checking the example - 
https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredSessionization.scala

Also searching on the internet, I don't see any examples that close to my need. 
Any pointers or docs that may talk about this or code snippet that may 
illustrate such purpose?

Thanks

Re: Newbie question on how to extract column value

2018-08-07 Thread James Starks
Because of some legacy issues I can't immediately upgrade spark version. But I 
try filter data before loading it into spark based on the suggestion by

 val df = sparkSession.read.format("jdbc").option(...).option("dbtable", 
"(select .. from ... where url <> '') table_name")load()
 df.createOrReplaceTempView("new_table")

Then perform custom operation do the trick.

sparkSession.sql("select id, url from new_table").as[(String, String)].map 
{ case (id, url) =>
   val derived_data = ... // operation on url
   (id, derived_data)
}.show()

Thanks for the advice, it's really helpful!

‐‐‐ Original Message ‐‐‐
On August 7, 2018 5:33 PM, Gourav Sengupta  wrote:

> Hi James,
>
> It is always advisable to use the latest SPARK version. That said, can you 
> please giving a try to dataframes and udf if possible. I think, that would be 
> a much scalable way to address the issue.
>
> Also in case possible, it is always advisable to use the filter option before 
> fetching the data to Spark.
>
> Thanks and Regards,
> Gourav
>
> On Tue, Aug 7, 2018 at 4:09 PM, James Starks  
> wrote:
>
>> I am very new to Spark. Just successfully setup Spark SQL connecting to 
>> postgresql database, and am able to display table with code
>>
>> sparkSession.sql("SELECT id, url from table_a where col_b <> '' ").show()
>>
>> Now I want to perform filter and map function on col_b value. In plain scala 
>> it would be something like
>>
>> Seq((1, "http://a.com/a;), (2, "http://b.com/b;), (3, "unknown")).filter 
>> { case (_, url) => isValid(url) }.map { case (id, url)  => (id, pathOf(url)) 
>> }
>>
>> where filter will remove invalid url, and then map (id, url) to (id, path of 
>> url).
>>
>> However, when applying this concept to spark sql with code snippet
>>
>> sparkSession.sql("...").filter(isValid($"url"))
>>
>> Compiler complains type mismatch because $"url" is ColumnName type. How can 
>> I extract column value i.e. http://... for the column url in order to 
>> perform filter function?
>>
>> Thanks
>>
>> Java 1.8.0
>> Scala 2.11.8
>> Spark 2.1.0

Newbie question on how to extract column value

2018-08-07 Thread James Starks
I am very new to Spark. Just successfully setup Spark SQL connecting to 
postgresql database, and am able to display table with code

sparkSession.sql("SELECT id, url from table_a where col_b <> '' ").show()

Now I want to perform filter and map function on col_b value. In plain scala it 
would be something like

Seq((1, "http://a.com/a;), (2, "http://b.com/b;), (3, "unknown")).filter { 
case (_, url) => isValid(url) }.map { case (id, url)  => (id, pathOf(url)) }

where filter will remove invalid url, and then map (id, url) to (id, path of 
url).

However, when applying this concept to spark sql with code snippet

sparkSession.sql("...").filter(isValid($"url"))

Compiler complains type mismatch because $"url" is ColumnName type. How can I 
extract column value i.e. http://... for the column url in order to perform 
filter function?

Thanks

Java 1.8.0
Scala 2.11.8
Spark 2.1.0

unsubscribe

2018-02-01 Thread James Casiraghi
unsubscribe


RE: Any NLP library for sentiment analysis in Spark?

2017-04-11 Thread Gabriel James
Me too. Experiences and recommendations please.

 

Gabriel

 

From: Kevin Wang [mailto:buz...@gmail.com] 
Sent: Wednesday, April 12, 2017 6:11 AM
To: Alonso Isidoro Roman 
Cc: Gaurav1809 ; user@spark.apache.org
Subject: Re: Any NLP library for sentiment analysis in Spark?

 

I am also interested in this topic.  Anything else anyone can recommend?  
Thanks.

 

Best,

 

Kevin 

 

On Tue, Apr 11, 2017 at 5:00 AM, Alonso Isidoro Roman  > wrote:

i did not use it yet, but this library looks promising:

 

https://github.com/databricks/spark-corenlp

 





Alonso Isidoro Roman

about.me/alonso.isidoro.roman

 

2017-04-11 11:02 GMT+02:00 Gaurav1809  >:

Hi All,

I need to determine sentiment for given document (statement, paragraph etc.)
Is there any NLP library available with Apache Spark that I can use here?

Any other pointers towards this would be highly appreciated.

Thanks in advance.
Gaurav Pandya



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Any-NLP-library-for-sentiment-analysis-in-Spark-tp28586.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org 
 

 

 



Anyone attending spark summit?

2016-10-11 Thread Andrew James
Hey, I just found a promo code for Spark Summit Europe that saves 20%. It’s
"Summit16" -  I love Brussels and just registered! Who’s coming with me to
get their Spark on?!



Cheers,

Andrew


UNSUBSCRIBE

2016-08-09 Thread James Ding





smime.p7s
Description: S/MIME cryptographic signature


Re: Spark, Scala, and DNA sequencing

2016-07-25 Thread James McCabe

Thanks Ofir and Sean,

I'm aware of AMPLab's ADAM. Spark is bringing down the cost of genome 
analysis to consumer level. The potential for the future of medicine is 
exciting indeed.


I normally do Scala consulting which keeps me too busy, but recently I 
finally got some spare time to look into interesting open-source 
projects like this.


James


On 24/07/16 09:09, Sean Owen wrote:

Also also, you may be interested in GATK, built on Spark, for genomics:
https://github.com/broadinstitute/gatk


On Sun, Jul 24, 2016 at 7:56 AM, Ofir Manor <ofir.ma...@equalum.io> wrote:

Hi James,
BTW - if you are into analyzing DNA with Spark, you may also be interested
in ADAM:
https://github.com/bigdatagenomics/adam
 http://bdgenomics.org/

Ofir Manor

Co-Founder & CTO | Equalum

Mobile: +972-54-7801286 | Email: ofir.ma...@equalum.io


On Fri, Jul 22, 2016 at 10:31 PM, James McCabe <ja...@oranda.com> wrote:

Hi!

I hope this may be of use/interest to someone:

Spark, a Worked Example: Speeding Up DNA Sequencing


http://scala-bility.blogspot.nl/2016/07/spark-worked-example-speeding-up-dna.html

James


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org




-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Spark, Scala, and DNA sequencing

2016-07-22 Thread James McCabe

Hi!

I hope this may be of use/interest to someone:

Spark, a Worked Example: Speeding Up DNA Sequencing

http://scala-bility.blogspot.nl/2016/07/spark-worked-example-speeding-up-dna.html 



James


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: dataframe udf functioin will be executed twice when filter on new column created by withColumn

2016-05-11 Thread James Hammerton
This may be related to: https://issues.apache.org/jira/browse/SPARK-13773

Regards,

James

On 11 May 2016 at 15:49, Ted Yu <yuzhih...@gmail.com> wrote:

> In master branch, behavior is the same.
>
> Suggest opening a JIRA if you haven't done so.
>
> On Wed, May 11, 2016 at 6:55 AM, Tony Jin <linbojin...@gmail.com> wrote:
>
>> Hi guys,
>>
>> I have a problem about spark DataFrame. My spark version is 1.6.1.
>> Basically, i used udf and df.withColumn to create a "new" column, and
>> then i filter the values on this new columns and call show(action). I see
>> the udf function (which is used to by withColumn to create the new column)
>> is called twice(duplicated). And if filter on "old" column, udf only run
>> once which is expected. I attached the example codes, line 30~38 shows the
>> problem.
>>
>>  Anyone knows the internal reason? Can you give me any advices? Thank you
>> very much.
>>
>>
>> 1
>> 2
>> 3
>> 4
>> 5
>> 6
>> 7
>> 8
>> 9
>> 10
>> 11
>> 12
>> 13
>> 14
>> 15
>> 16
>> 17
>> 18
>> 19
>> 20
>> 21
>> 22
>> 23
>> 24
>> 25
>> 26
>> 27
>> 28
>> 29
>> 30
>> 31
>> 32
>> 33
>> 34
>> 35
>> 36
>> 37
>> 38
>> 39
>> 40
>> 41
>> 42
>> 43
>> 44
>> 45
>> 46
>> 47
>>
>> scala> import org.apache.spark.sql.functions._
>> import org.apache.spark.sql.functions._
>>
>> scala> val df = sc.parallelize(Seq(("a", "b"), ("a1", 
>> "b1"))).toDF("old","old1")
>> df: org.apache.spark.sql.DataFrame = [old: string, old1: string]
>>
>> scala> val udfFunc = udf((s: String) => {println(s"running udf($s)"); s })
>> udfFunc: org.apache.spark.sql.UserDefinedFunction = 
>> UserDefinedFunction(,StringType,List(StringType))
>>
>> scala> val newDF = df.withColumn("new", udfFunc(df("old")))
>> newDF: org.apache.spark.sql.DataFrame = [old: string, old1: string, new: 
>> string]
>>
>> scala> newDF.show
>> running udf(a)
>> running udf(a1)
>> +---++---+
>> |old|old1|new|
>> +---++---+
>> |  a|   b|  a|
>> | a1|  b1| a1|
>> +---++---+
>>
>>
>> scala> val filteredOnNewColumnDF = newDF.filter("new <> 'a1'")
>> filteredOnNewColumnDF: org.apache.spark.sql.DataFrame = [old: string, old1: 
>> string, new: string]
>>
>> scala> val filteredOnOldColumnDF = newDF.filter("old <> 'a1'")
>> filteredOnOldColumnDF: org.apache.spark.sql.DataFrame = [old: string, old1: 
>> string, new: string]
>>
>> scala> filteredOnNewColumnDF.show
>> running udf(a)
>> running udf(a)
>> running udf(a1)
>> +---++---+
>> |old|old1|new|
>> +---++---+
>> |  a|   b|  a|
>> +---++---+
>>
>>
>> scala> filteredOnOldColumnDF.show
>> running udf(a)
>> +---++---+
>> |old|old1|new|
>> +---++---+
>> |  a|   b|  a|
>> +---++---+
>>
>>
>>
>> Best wishes.
>> By Linbo
>>
>>
>


Help understanding an exception that produces multiple stack traces

2016-05-09 Thread James Casiraghi
Hi,

Below is an error I got while using Spark 1.6.1 on AWS EMR 4.5.  I am trying to 
understand what exactly this error is telling me.  I see the exception, then 
what I am assuming is the plan being executed the resulting stack trace, 
followed by two caused by stack traces, and then a driver stack trace followed 
by the two caused by stack traces repeated.  I am used to errors that only 
produce one stack trace, can someone explain why I am getting 6 stack traces (4 
unique ones)?  Should I focus more on one of these stack traces versus the 
other?  I believe that there was one error then a sort of cascading of errors 
across workers/driver.  I am just not sure which is the originating error.  

I am assuming I am reading something wrong with this particular stack trace 
because the first part is confusing me.  I thought that persist calls do not 
actually trigger work because of the lazy evaluation, and that only actions 
will do this, but the initial stack trace seems to be showing a persist call 
with underlying executing work.  

-Thank you.
-James

Stack Trace:
An error occurred while calling o236.persist.
: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:
Exchange rangepartitioning(nation#9998 ASC,o_year# DESC,5120), None
+- ConvertToSafe
   +- TungstenAggregate(key=[nation#9998,o_year#], 
functions=[(sum(amount#1),mode=Final,isDistinct=false)], 
output=[nation#9998,o_year#,sum_profit#9988])
  +- TungstenExchange hashpartitioning(nation#9998,o_year#,5120), None
 +- TungstenAggregate(key=[nation#9998,o_year#], 
functions=[(sum(amount#1),mode=Partial,isDistinct=false)], 
output=[nation#9998,o_year#,sum#10255])
+- Project [n_name#196 AS nation#9998,year(o_orderdate#96) AS 
o_year#,CheckOverflow((cast(CheckOverflow((cast(l_extendedprice#344 as 
decimal(13,2)) * CheckOverflow((1.00 - cast(l_discount#345 as decimal(13,2))), 
DecimalType(13,2))), DecimalType(26,4)) as decimal(27,4)) - 
cast(CheckOverflow((ps_supplycost#283 * l_quantity#343), DecimalType(25,4)) as 
decimal(27,4))), DecimalType(27,4)) AS amount#1]
   +- BroadcastHashJoin [s_nationkey#522], [n_nationkey#195], 
BuildRight
  :- Project 
[l_extendedprice#344,l_quantity#343,ps_supplycost#283,l_discount#345,s_nationkey#522,o_orderdate#96]
  :  +- SortMergeJoin [l_orderkey#339], [o_orderkey#92]
  : :- Sort [l_orderkey#339 ASC], false, 0
  : :  +- TungstenExchange 
hashpartitioning(l_orderkey#339,5120), None
  : : +- Project 
[l_orderkey#339,l_extendedprice#344,l_quantity#343,ps_supplycost#283,l_discount#345,s_nationkey#522]
  : :+- SortMergeJoin 
[l_suppkey#341,l_partkey#340], [ps_suppkey#281,ps_partkey#280]
  : :   :- Sort [l_suppkey#341 ASC,l_partkey#340 
ASC], false, 0
  : :   :  +- TungstenExchange 
hashpartitioning(l_suppkey#341,l_partkey#340,5120), None
  : :   : +- Project 
[l_orderkey#339,l_suppkey#341,l_extendedprice#344,l_quantity#343,l_partkey#340,l_discount#345,s_nationkey#522]
  : :   :+- SortMergeJoin [l_suppkey#341], 
[s_suppkey#519]
  : :   :   :- Sort [l_suppkey#341 ASC], 
false, 0
  : :   :   :  +- TungstenExchange 
hashpartitioning(l_suppkey#341,5120), None
  : :   :   : +- Project 
[l_orderkey#339,l_suppkey#341,l_extendedprice#344,l_quantity#343,l_partkey#340,l_discount#345]
  : :   :   :+- SortMergeJoin 
[p_partkey#600], [l_partkey#340]
  : :   :   :   :- Sort 
[p_partkey#600 ASC], false, 0
  : :   :   :   :  +- 
TungstenExchange hashpartitioning(p_partkey#600,5120), None
  : :   :   :   : +- Project 
[p_partkey#600]
  : :   :   :   :+- Filter 
Contains(p_name#601, almond)
  : :   :   :   :   +- 
InMemoryColumnarTableScan [p_partkey#600,p_name#601], [Contains(p_name#601, 
almond)], InMemoryRelation 
[p_partkey#600,p_name#601,p_mfgr#602,p_brand#603,p_type#604,p_size#605,p_container#606,p_retailprice#607,p_comment#608],
 true, 1, StorageLevel(true, true, false, true, 1), Scan 
ParquetRelation[p_partkey#600,p_name#601,p_mfgr#602,p_brand#603,p_type#604,p_size#605,p_container#606,p_retailprice#607,p_comment#608]
 InputPaths: s3://aqapop/DataAlgebraData/tpch/1000G/5120/part.parquet, None
  : :   :   :   +- Sort 
[l_partkey#340 ASC], false, 0
  : :   :   :  +- 
TungstenExchange hashpartitioning

Re: Error from reading S3 in Scala

2016-05-04 Thread James Hammerton
On 3 May 2016 at 17:22, Gourav Sengupta <gourav.sengu...@gmail.com> wrote:

> Hi,
>
> The best thing to do is start the EMR clusters with proper permissions in
> the roles that way you do not need to worry about the keys at all.
>
> Another thing, why are we using s3a// instead of s3:// ?
>

Probably because of what's said about s3:// and s3n:// here (which is why I
use s3a://):

https://wiki.apache.org/hadoop/AmazonS3

Regards,

James


> Besides that you can increase s3 speeds using the instructions mentioned
> here:
> https://aws.amazon.com/blogs/aws/aws-storage-update-amazon-s3-transfer-acceleration-larger-snowballs-in-more-regions/
>
>
> Regards,
> Gourav
>
> On Tue, May 3, 2016 at 12:04 PM, Steve Loughran <ste...@hortonworks.com>
> wrote:
>
>> don't put your secret in the URI, it'll only creep out in the logs.
>>
>> Use the specific properties coverd in
>> http://hadoop.apache.org/docs/current/hadoop-aws/tools/hadoop-aws/index.html,
>> which you can set in your spark context by prefixing them with spark.hadoop.
>>
>> you can also set the env vars, AWS_ACCESS_KEY_ID and
>> AWS_SECRET_ACCESS_KEY; SparkEnv will pick these up and set the relevant
>> spark context keys for you
>>
>>
>> On 3 May 2016, at 01:53, Zhang, Jingyu <jingyu.zh...@news.com.au> wrote:
>>
>> Hi All,
>>
>> I am using Eclipse with Maven for developing Spark applications. I got a
>> error for Reading from S3 in Scala but it works fine in Java when I run
>> them in the same project in Eclipse. The Scala/Java code and the error in
>> following
>>
>>
>> Scala
>>
>> val uri = URI.create("s3a://" + key + ":" + seckey + "@" +
>> "graphclustering/config.properties");
>> val pt = new Path("s3a://" + key + ":" + seckey + "@" +
>> "graphclustering/config.properties");
>> val fs = FileSystem.get(uri,ctx.hadoopConfiguration);
>> val  inputStream:InputStream = fs.open(pt);
>>
>> Exception: on aws-java-1.7.4 and hadoop-aws-2.6.1
>>
>> Exception in thread "main"
>> com.amazonaws.services.s3.model.AmazonS3Exception: Forbidden (Service:
>> Amazon S3; Status Code: 403; Error Code: 403 Forbidden; Request ID:
>> 8A56DC7BF0BFF09A), S3 Extended Request ID
>>
>> at com.amazonaws.http.AmazonHttpClient.handleErrorResponse(
>> AmazonHttpClient.java:1160)
>>
>> at com.amazonaws.http.AmazonHttpClient.executeOneRequest(
>> AmazonHttpClient.java:748)
>>
>> at com.amazonaws.http.AmazonHttpClient.executeHelper(
>> AmazonHttpClient.java:467)
>>
>> at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:302)
>>
>> at com.amazonaws.services.s3.AmazonS3Client.invoke(
>> AmazonS3Client.java:3785)
>>
>> at com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(
>> AmazonS3Client.java:1050)
>>
>> at com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(
>> AmazonS3Client.java:1027)
>>
>> at org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(
>> S3AFileSystem.java:688)
>>
>> at org.apache.hadoop.fs.s3a.S3AFileSystem.open(S3AFileSystem.java:222)
>>
>> at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:766)
>>
>> at com.news.report.graph.GraphCluster$.main(GraphCluster.scala:53)
>>
>> at com.news.report.graph.GraphCluster.main(GraphCluster.scala)
>>
>> 16/05/03 10:49:17 INFO SparkContext: Invoking stop() from shutdown hook
>>
>> 16/05/03 10:49:17 INFO SparkUI: Stopped Spark web UI at
>> http://10.65.80.125:4040
>>
>> 16/05/03 10:49:17 INFO MapOutputTrackerMasterEndpoint:
>> MapOutputTrackerMasterEndpoint stopped!
>>
>> 16/05/03 10:49:17 INFO MemoryStore: MemoryStore cleared
>>
>> 16/05/03 10:49:17 INFO BlockManager: BlockManager stopped
>>
>> Exception: on aws-java-1.7.4 and hadoop-aws-2.7.2
>>
>> 16/05/03 10:23:40 INFO Slf4jLogger: Slf4jLogger started
>>
>> 16/05/03 10:23:40 INFO Remoting: Starting remoting
>>
>> 16/05/03 10:23:40 INFO Remoting: Remoting started; listening on addresses
>> :[akka.tcp://sparkDriverActorSystem@10.65.80.125:61860]
>>
>> 16/05/03 10:23:40 INFO Utils: Successfully started service
>> 'sparkDriverActorSystem' on port 61860.
>>
>> 16/05/03 10:23:40 INFO SparkEnv: Registering MapOutputTracker
>>
>> 16/05/03 10:23:40 INFO SparkEnv: Registering BlockManagerMaster
>>
>> 16/05/03 10:23:40 INFO DiskBlockManager: Created local directory at
>> /private/var/folders/sc/tdmkbvr170

Will nested field performance improve?

2016-04-15 Thread James Aley
Hello,

I'm trying to make a call on whether my team should invest time added a
step to "flatten" our schema as part of our ETL pipeline to improve
performance of interactive queries.

Our data start out life as Avro before being converted to Parquet, and so
we follow the Avro idioms of creating our own types to reduce boilerplate
in many areas. For example, every record we define has a "metadata" struct
field with all the fields that are common to all records as part of the
system design. Those fields are very common, and so virtually all queries
need to access them. As a result, nearly all of our queries don't see the
best performance we could be seeing in Spark SQL, etc.

So my question - is this just inherently the way it is, or do we expect
future releases will put them on a par with flat fields? The reason I ask
is that I've actually seen similar differences in performance with Presto
too. In benchmarks for both Spark and Presto, I generally see queries
working on flat fields run 5-6x faster than queries doing the same thing on
a nested field.

If we expect fields nested in structs to always be much slower than flat
fields, then I would be keen to address that in our ETL pipeline with a
flattening step. If it's a known issue that we expect will be fixed in
upcoming releases, I'll hold off.

Any advice greatly appreciated!

Thanks,

James.


Re: ML Random Forest Classifier

2016-04-13 Thread James Hammerton
Hi Ashic,

Unfortunately I don't know how to work around that - I suggested this line
as it looked promising (I had considered it once before deciding to use a
different algorithm) but I never actually tried it.

Regards,

James

On 13 April 2016 at 02:29, Ashic Mahtab <as...@live.com> wrote:

> It looks like the issue is around impurity stats. After converting an rf
> model to old, and back to new (without disk storage or anything), and
> specifying the same num of features, same categorical features map, etc.,
> DecisionTreeClassifier::predictRaw throws a null pointer exception here:
>
>  override protected def predictRaw(features: Vector): Vector = {
> Vectors.dense(rootNode.predictImpl(features).*impurityStats.*
> stats.clone())
>   }
>
> It appears impurityStats is always null (even though impurity does have a
> value). Any known workarounds? It's looking like I'll have to revert to
> using mllib instead :(
>
> -Ashic.
>
> --
> From: as...@live.com
> To: ja...@gluru.co
> CC: user@spark.apache.org
> Subject: RE: ML Random Forest Classifier
> Date: Wed, 13 Apr 2016 02:20:53 +0100
>
>
> I managed to get to the map using MetadataUtils (it's a private ml
> package). There are still some issues, around feature names, etc. Trying to
> pin them down.
>
> --
> From: as...@live.com
> To: ja...@gluru.co
> CC: user@spark.apache.org
> Subject: RE: ML Random Forest Classifier
> Date: Wed, 13 Apr 2016 00:50:31 +0100
>
> Hi James,
> Following on from the previous email, is there a way to get the
> categoricalFeatures of a Spark ML Random Forest? Essentially something I
> can pass to
>
> RandomForestClassificationModel.fromOld(oldModel, parent,
> *categoricalFeatures*, numClasses, numFeatures)
>
> I could construct it by hand, but I was hoping for a more automated way of
> getting the map. Since the trained model already knows about the value,
> perhaps it's possible to grab it for storage?
>
> Thanks,
> Ashic.
>
> --
> From: as...@live.com
> To: ja...@gluru.co
> CC: user@spark.apache.org
> Subject: RE: ML Random Forest Classifier
> Date: Mon, 11 Apr 2016 23:21:53 +0100
>
> Thanks, James. That looks promising.
>
> --
> Date: Mon, 11 Apr 2016 10:41:07 +0100
> Subject: Re: ML Random Forest Classifier
> From: ja...@gluru.co
> To: as...@live.com
> CC: user@spark.apache.org
>
> To add a bit more detail perhaps something like this might work:
>
> package org.apache.spark.ml
>
>
> import org.apache.spark.ml.classification.RandomForestClassificationModel
> import org.apache.spark.ml.classification.DecisionTreeClassificationModel
> import org.apache.spark.ml.classification.LogisticRegressionModel
> import org.apache.spark.mllib.tree.model.{ RandomForestModel =>
> OldRandomForestModel }
> import org.apache.spark.ml.classification.RandomForestClassifier
>
>
> object RandomForestModelConverter {
>
>
>   def fromOld(oldModel: OldRandomForestModel, parent:
> RandomForestClassifier = null,
> categoricalFeatures: Map[Int, Int], numClasses: Int, numFeatures: Int
> = -1): RandomForestClassificationModel = {
> RandomForestClassificationModel.fromOld(oldModel, parent,
> categoricalFeatures, numClasses, numFeatures)
>   }
>
>
>   def toOld(newModel: RandomForestClassificationModel):
> OldRandomForestModel = {
> newModel.toOld
>   }
> }
>
>
> Regards,
>
> James
>
>
> On 11 April 2016 at 10:36, James Hammerton <ja...@gluru.co> wrote:
>
> There are methods for converting the dataframe based random forest models
> to the old RDD based models and vice versa. Perhaps using these will help
> given that the old models can be saved and loaded?
>
> In order to use them however you will need to write code in the
> org.apache.spark.ml package.
>
> I've not actually tried doing this myself but it looks as if it might work.
>
> Regards,
>
> James
>
> On 11 April 2016 at 10:29, Ashic Mahtab <as...@live.com> wrote:
>
> Hello,
> I'm trying to save a pipeline with a random forest classifier. If I try to
> save the pipeline, it complains that the classifier is not Writable, and
> indeed the classifier itself doesn't have a write function. There's a pull
> request that's been merged that enables this for Spark 2.0 (any dates
> around when that'll release?). I am, however, using the Spark Cassandra
> Connector which doesn't seem to be able to create a CqlContext with spark
> 2.0 snapshot builds. Seeing that ML Lib's random forest classifier supports
> storing and loading models, is there a way to create a Spark ML pipeline in
> Spark 1.6 with a random forest classifier that'll allow me to store and
> load the model? The model takes significant amount of time to train, and I
> really don't want to have to train it every time my application launches.
>
> Thanks,
> Ashic.
>
>
>
>


Re: ML Random Forest Classifier

2016-04-11 Thread James Hammerton
To add a bit more detail perhaps something like this might work:

package org.apache.spark.ml
>
>
> import org.apache.spark.ml.classification.RandomForestClassificationModel
>
> import org.apache.spark.ml.classification.DecisionTreeClassificationModel
>
> import org.apache.spark.ml.classification.LogisticRegressionModel
>
> import org.apache.spark.mllib.tree.model.{ RandomForestModel =>
> OldRandomForestModel }
>
> import org.apache.spark.ml.classification.RandomForestClassifier
>
>
> object RandomForestModelConverter {
>
>
>   def fromOld(oldModel: OldRandomForestModel, parent:
> RandomForestClassifier = null,
>
> categoricalFeatures: Map[Int, Int], numClasses: Int, numFeatures: Int
> = -1): RandomForestClassificationModel = {
>
> RandomForestClassificationModel.fromOld(oldModel, parent,
> categoricalFeatures, numClasses, numFeatures)
>
>   }
>
>
>   def toOld(newModel: RandomForestClassificationModel):
> OldRandomForestModel = {
>
> newModel.toOld
>
>   }
>
> }
>

Regards,

James


On 11 April 2016 at 10:36, James Hammerton <ja...@gluru.co> wrote:

> There are methods for converting the dataframe based random forest models
> to the old RDD based models and vice versa. Perhaps using these will help
> given that the old models can be saved and loaded?
>
> In order to use them however you will need to write code in the
> org.apache.spark.ml package.
>
> I've not actually tried doing this myself but it looks as if it might work.
>
> Regards,
>
> James
>
> On 11 April 2016 at 10:29, Ashic Mahtab <as...@live.com> wrote:
>
>> Hello,
>> I'm trying to save a pipeline with a random forest classifier. If I try
>> to save the pipeline, it complains that the classifier is not Writable, and
>> indeed the classifier itself doesn't have a write function. There's a pull
>> request that's been merged that enables this for Spark 2.0 (any dates
>> around when that'll release?). I am, however, using the Spark Cassandra
>> Connector which doesn't seem to be able to create a CqlContext with spark
>> 2.0 snapshot builds. Seeing that ML Lib's random forest classifier supports
>> storing and loading models, is there a way to create a Spark ML pipeline in
>> Spark 1.6 with a random forest classifier that'll allow me to store and
>> load the model? The model takes significant amount of time to train, and I
>> really don't want to have to train it every time my application launches.
>>
>> Thanks,
>> Ashic.
>>
>
>


Re: ML Random Forest Classifier

2016-04-11 Thread James Hammerton
There are methods for converting the dataframe based random forest models
to the old RDD based models and vice versa. Perhaps using these will help
given that the old models can be saved and loaded?

In order to use them however you will need to write code in the
org.apache.spark.ml package.

I've not actually tried doing this myself but it looks as if it might work.

Regards,

James

On 11 April 2016 at 10:29, Ashic Mahtab <as...@live.com> wrote:

> Hello,
> I'm trying to save a pipeline with a random forest classifier. If I try to
> save the pipeline, it complains that the classifier is not Writable, and
> indeed the classifier itself doesn't have a write function. There's a pull
> request that's been merged that enables this for Spark 2.0 (any dates
> around when that'll release?). I am, however, using the Spark Cassandra
> Connector which doesn't seem to be able to create a CqlContext with spark
> 2.0 snapshot builds. Seeing that ML Lib's random forest classifier supports
> storing and loading models, is there a way to create a Spark ML pipeline in
> Spark 1.6 with a random forest classifier that'll allow me to store and
> load the model? The model takes significant amount of time to train, and I
> really don't want to have to train it every time my application launches.
>
> Thanks,
> Ashic.
>


Logistic regression throwing errors

2016-04-01 Thread James Hammerton
Hi,

On a particular .csv data set - which I can use in WEKA's logistic
regression implementation without any trouble, I'm getting errors like the
following:

16/04/01 18:04:18 ERROR LBFGS: Failure! Resetting history:
> breeze.optimize.FirstOrderException: Line search failed

These errors cause the learning to fail - f1 = 0.

Anyone got any idea why this might happen?

Regards,

James


Re: Work out date column in CSV more than 6 months old (datediff or something)

2016-03-22 Thread James Hammerton
On 22 March 2016 at 10:57, Mich Talebzadeh <mich.talebza...@gmail.com>
wrote:

> Thanks Silvio.
>
> The problem I have is that somehow string comparison does not work.
>
> Case in point
>
> val df =
> sqlContext.read.format("com.databricks.spark.csv").option("inferSchema",
> "true").option("header", "true").load("/data/stg/table2")
> val current_date = sqlContext.sql("SELECT FROM_unixtime(unix_timestamp(),
> 'dd/MM/') ").collect.apply(0).getString(0)
> df.filter(*lit(current_date) < col("Payment 
> date"*)).select(lit(current_date).alias("current_date"),
> col("Payment date").alias("PaymentDate")).show(5)
>
>
This is doing a string comparison not a date comparison (assuming "Payment
date" is of type String).

E.g.

scala> "22/03/2016" < "24/02/2015"
>
> res4: Boolean = true
>
>
>> scala> "22/03/2016" < "04/02/2015"
>
> res5: Boolean = false
>
>
This is the correct result for a string comparison but it's not the
comparison you want.

I think you need to convert the "Payment date" with "to_date" and compare
against that.

E.g. something like: df.filter(current_date() < to_date(col("Payment
date")))

Regards,

James



> It selects all the rows that are less than today's date (they are old).
>
> ++---+
> |current_date|PaymentDate|
> ++---+
> |  22/03/2016| 24/02/2014|
> |  22/03/2016| 24/03/2014|
> |  22/03/2016| 31/03/2015|
> |  22/03/2016| 28/04/2014|
> |  22/03/2016| 26/05/2014|
> ++---+
>
> I don't know why this comparison is failing. May be it is comparing the
> first two leftmost characters?
>
> Thanks
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 22 March 2016 at 00:26, Silvio Fiorito <silvio.fior...@granturing.com>
> wrote:
>
>> There’s a months_between function you could use, as well:
>>
>> df.filter(months_between(current_date, $”Payment Date”) > 6).show
>>
>> From: Mich Talebzadeh <mich.talebza...@gmail.com>
>> Date: Monday, March 21, 2016 at 5:53 PM
>> To: "user @spark" <user@spark.apache.org>
>> Subject: Work out date column in CSV more than 6 months old (datediff or
>> something)
>>
>> Hi,
>>
>> For test purposes I am reading in a simple csv file as follows:
>>
>> val df =
>> sqlContext.read.format("com.databricks.spark.csv").option("inferSchema",
>> "true").option("header", "true").load("/data/stg/table2")
>> df: org.apache.spark.sql.DataFrame = [Invoice Number: string, Payment
>> date: string, Net: string, VAT: string, Total: string]
>>
>> For this work I am interested in column "Payment Date" > 6 months old
>> from today
>>
>> Data is stored in the following format for that column
>>
>> scala> df.select("Payment date").take(2)
>> res40: Array[org.apache.spark.sql.Row] = Array([10/02/2014], [17/02/2014])
>>
>> stored as 'dd/MM/'
>>
>> The current time I get as
>>
>> scala> val today = sqlContext.sql("SELECT FROM_unixtime(unix_timestamp(),
>> 'dd/MM/') ").collect.apply(0).getString(0)
>> today: String = 21/03/2016
>>
>>
>> So I want to filter the csv file
>>
>> scala>  df.filter(col("Payment date") < lit(today)).show(2)
>> +--++-+-+-+
>> |Invoice Number|Payment date|  Net|  VAT|Total|
>> +--++-+-+-+
>> |   360|  10/02/2014|?2,500.00|?0.00|?2,500.00|
>> |   361|  17/02/2014|?2,500.00|?0.00|?2,500.00|
>> +--++-+-+-+
>>
>>
>> However, I want to use datediff() function here not just < today!
>>
>>
>> Obviously one can store the file as a table and use SQL on it. However, I
>> want to see if there are other ways using fp.
>>
>> Thanks
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>>
>
>


Re: Find all invoices more than 6 months from csv file

2016-03-22 Thread James Hammerton
On 21 March 2016 at 17:57, Mich Talebzadeh <mich.talebza...@gmail.com>
wrote:

>
> Hi,
>
> For test purposes I am ready a simple csv file as follows:
>
> val df =
> sqlContext.read.format("com.databricks.spark.csv").option("inferSchema",
> "true").option("header", "true").load("/data/stg/table2")
> df: org.apache.spark.sql.DataFrame = [Invoice Number: string, Payment
> date: string, Net: string, VAT: string, Total: string]
>
> For this work I am interested in column "Payment Date" > 6 months old from
> today
>
> Data is stored in the following format for that column
>
> scala> df.select("Payment date").take(2)
> res40: Array[org.apache.spark.sql.Row] = Array([10/02/2014], [17/02/2014])
>
> stored as 'dd/MM/'
>
> The current time I get as
>
> scala> val today = sqlContext.sql("SELECT FROM_unixtime(unix_timestamp(),
> 'dd/MM/') ").collect.apply(0).getString(0)
> today: String = 21/03/2016
>
>
> So I want to filter the csv file
>
> scala>  df.filter(col("Payment date") < lit(today)).show(2)
> +--++-+-+-+
> |Invoice Number|Payment date|  Net|  VAT|Total|
> +--++-+-+-+
> |   360|  10/02/2014|?2,500.00|?0.00|?2,500.00|
> |   361|  17/02/2014|?2,500.00|?0.00|?2,500.00|
> +--++-+-+-+
>
>
> However, I want to use datediff() function here not just < today!
>
>
Could you not compute which the date of the 6 month cut-off point and use
that in place of today?

Looking at the api I see an add_month(), date_add() and date_sub() methods,
the first adds a number of months to a start date (would adding a -ve
number of months to the current date work?), the latter two add or subtract
a specified number of days to/from a date, these are available in 1.5.0
onwards.

Alternatively outside of the SQL api (e.g. in a UDF) you could use
something like:

val c = Calendar.getInstance()
> c.setTime(new Date(System.currentTimeMillis()))
> c.add(Calendar.MONTH, -6)
> val date: Date = c.getTime


Regards,

James




>
> Obviously one can store the file as a table and use SQL on it. However, I
> want to see if there are other ways using fp.
>
> Thanks
>
>
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>


Add org.apache.spark.mllib model .predict() method to models in org.apache.spark.ml?

2016-03-22 Thread James Hammerton
Hi,

The machine learning models in org.apache.spark.mllib have a .predict()
method that can be applied to a Vector to return a prediction.

However this method does not appear on the new models on org.apache.spark.ml
and you have to wrap up a Vector in a DataFrame to send a prediction in.
This ties you into bringing in more of Spark's code as a dependency if you
wish to embed the models in production code outside of Spark.

Also if you wish to feed predictions in one at a time in that context it
makes the process a lot slower, thus it seems to me the old models are more
amenable to being used outside of Spark than the new models at this time.

Are there any plans to add the .predict() method back to the models in the
new API?

Regards,

James


Re: best way to do deep learning on spark ?

2016-03-20 Thread James Hammerton
In the meantime there is also deeplearning4j which integrates with Spark
(for both Java and Scala): http://deeplearning4j.org/

Regards,

James

On 17 March 2016 at 02:32, Ulanov, Alexander <alexander.ula...@hpe.com>
wrote:

> Hi Charles,
>
>
>
> There is an implementation of multilayer perceptron in Spark (since 1.5):
>
>
> https://spark.apache.org/docs/latest/ml-classification-regression.html#multilayer-perceptron-classifier
>
>
>
> Other features such as autoencoder, convolutional layers, etc. are
> currently under development. Please refer to
> https://issues.apache.org/jira/browse/SPARK-5575
>
>
>
> Best regards, Alexander
>
>
>
> *From:* charles li [mailto:charles.up...@gmail.com]
> *Sent:* Wednesday, March 16, 2016 7:01 PM
> *To:* user <user@spark.apache.org>
> *Subject:* best way to do deep learning on spark ?
>
>
>
>
>
> Hi, guys, I'm new to MLlib on spark, after reading the document, it seems
> that MLlib does not support deep learning, I want to know is there any way
> to implement deep learning on spark ?
>
>
>
> *Do I must use 3-party package like caffe or tensorflow ?*
>
>
>
> or
>
>
>
> *Does deep learning module list in the MLlib development plan?*
>
>
>
>
> great thanks
>
>
>
> --
>
> *--*
>
> a spark lover, a quant, a developer and a good man.
>
>
>
> http://github.com/litaotao
>


Saving the DataFrame based RandomForestClassificationModels

2016-03-18 Thread James Hammerton
Hi,

If you train a
org.apache.spark.ml.classification.RandomForestClassificationModel, you
can't save it - attempts to do so yield the following error:

16/03/18 14:12:44 INFO SparkContext: Successfully stopped SparkContext
> Exception in thread "main" java.lang.UnsupportedOperationException:
> Pipeline write will fail on this Pipeline because it contains a stage
> which does not implement Writable. Non-Writable stage: rfc_704981ba3f48
> of type class org.apache.spark.ml.classification.RandomForestClassifier
> at org.apache.spark.ml.
> Pipeline$SharedReadWrite$$anonfun$validateStages$1.apply(Pipeline.scala:
> 218)
> at org.apache.spark.ml.
> Pipeline$SharedReadWrite$$anonfun$validateStages$1.apply(Pipeline.scala:
> 215)


This appears to be a known bug:
https://issues.apache.org/jira/browse/SPARK-13784 related to
https://issues.apache.org/jira/browse/SPARK-11888

My question is whether there's a work around given that these bugs are
unresolved at least until 2.0.0.

Regards,

James


Best way to process values for key in sorted order

2016-03-15 Thread James Hammerton
Hi,

I need to process some events in a specific order based on a timestamp, for
each user in my data.

I had implemented this by using the dataframe sort method to sort by user
id and then sort by the timestamp secondarily, then do a
groupBy().mapValues() to process the events for each user.

However on re-reading the docs I see that groupByKey() does not guarantee
any ordering of the values, yet my code (which will fall over on out of
order events) seems to run OK so far, on a local mode but with a machine
with 8 CPUs.

I guess the easiest way to be certain would be to sort the values after the
groupByKey, but I'm wondering if using mapPartitions() to process all
entries in a partition would work, given I had pre-ordered the data?

This would require a bit more work to track when I switch from one user to
the next as I process the events, but if the original order has been
preserved on reading the events in, this should work.

Anyone know definitively if this is the case?

Regards,

James


Re: DataFrame .filter only seems to work when .cache is called in local mode in 1.6.0

2016-03-09 Thread James Hammerton
Hi Ted,

Finally got round to creating this:
https://issues.apache.org/jira/browse/SPARK-13773

I hope you don't mind me selecting you as the shepherd for this ticket.

Regards,

James


On 7 March 2016 at 17:50, James Hammerton <ja...@gluru.co> wrote:

> Hi Ted,
>
> Thanks for getting back - I realised my mistake... I was clicking the
> little drop down menu on the right hand side of the Create button (it looks
> as if it's part of the button) - when I clicked directly on the word
> "Create" I got a form that made more sense and allowed me to choose the
> project.
>
> Regards,
>
> James
>
>
> On 7 March 2016 at 13:09, Ted Yu <yuzhih...@gmail.com> wrote:
>
>> Have you tried clicking on Create button from an existing Spark JIRA ?
>> e.g.
>> https://issues.apache.org/jira/browse/SPARK-4352
>>
>> Once you're logged in, you should be able to select Spark as the Project.
>>
>> Cheers
>>
>> On Mon, Mar 7, 2016 at 2:54 AM, James Hammerton <ja...@gluru.co> wrote:
>>
>>> Hi,
>>>
>>> So I managed to isolate the bug and I'm ready to try raising a JIRA
>>> issue. I joined the Apache Jira project so I can create tickets.
>>>
>>> However when I click Create from the Spark project home page on JIRA, it
>>> asks me to click on one of the following service desks: Kylin, Atlas,
>>> Ranger, Apache Infrastructure. There doesn't seem to be an option for me to
>>> raise an issue for Spark?!
>>>
>>> Regards,
>>>
>>> James
>>>
>>>
>>> On 4 March 2016 at 14:03, James Hammerton <ja...@gluru.co> wrote:
>>>
>>>> Sure thing, I'll see if I can isolate this.
>>>>
>>>> Regards.
>>>>
>>>> James
>>>>
>>>> On 4 March 2016 at 12:24, Ted Yu <yuzhih...@gmail.com> wrote:
>>>>
>>>>> If you can reproduce the following with a unit test, I suggest you
>>>>> open a JIRA.
>>>>>
>>>>> Thanks
>>>>>
>>>>> On Mar 4, 2016, at 4:01 AM, James Hammerton <ja...@gluru.co> wrote:
>>>>>
>>>>> Hi,
>>>>>
>>>>> I've come across some strange behaviour with Spark 1.6.0.
>>>>>
>>>>> In the code below, the filtering by "eventName" only seems to work if
>>>>> I called .cache on the resulting DataFrame.
>>>>>
>>>>> If I don't do this, the code crashes inside the UDF because it
>>>>> processes an event that the filter should get rid off.
>>>>>
>>>>> Any ideas why this might be the case?
>>>>>
>>>>> The code is as follows:
>>>>>
>>>>>>   val df = sqlContext.read.parquet(inputPath)
>>>>>>   val filtered = df.filter(df("eventName").equalTo(Created))
>>>>>>   val extracted = extractEmailReferences(sqlContext,
>>>>>> filtered.cache) // Caching seems to be required for the filter to work
>>>>>>   extracted.write.parquet(outputPath)
>>>>>
>>>>>
>>>>> where extractEmailReferences does this:
>>>>>
>>>>>>
>>>>>
>>>>> def extractEmailReferences(sqlContext: SQLContext, df: DataFrame):
>>>>>> DataFrame = {
>>>>>
>>>>> val extracted = df.select(df(EventFieldNames.ObjectId),
>>>>>
>>>>>   extractReferencesUDF(df(EventFieldNames.EventJson),
>>>>>> df(EventFieldNames.ObjectId), df(EventFieldNames.UserId)) as 
>>>>>> "references")
>>>>>
>>>>>
>>>>>> extracted.filter(extracted("references").notEqual("UNKNOWN"))
>>>>>
>>>>>   }
>>>>>
>>>>>
>>>>> and extractReferencesUDF:
>>>>>
>>>>>> def extractReferencesUDF = udf(extractReferences(_: String, _:
>>>>>> String, _: String))
>>>>>
>>>>> def extractReferences(eventJson: String, objectId: String, userId:
>>>>>> String): String = {
>>>>>> import org.json4s.jackson.Serialization
>>>>>> import org.json4s.NoTypeHints
>>>>>> implicit val formats = Serialization.formats(NoTypeHints)
>>>>>>
>>>>>> val created = Serialization.read[GMailMessage.Created](eventJson)
>>>>>> // This is where the code crashes if the .cache isn't called
>>>>>
>>>>>
>>>>>  Regards,
>>>>>
>>>>> James
>>>>>
>>>>>
>>>>
>>>
>>
>


Re: DataFrame .filter only seems to work when .cache is called in local mode in 1.6.0

2016-03-07 Thread James Hammerton
Hi Ted,

Thanks for getting back - I realised my mistake... I was clicking the
little drop down menu on the right hand side of the Create button (it looks
as if it's part of the button) - when I clicked directly on the word
"Create" I got a form that made more sense and allowed me to choose the
project.

Regards,

James


On 7 March 2016 at 13:09, Ted Yu <yuzhih...@gmail.com> wrote:

> Have you tried clicking on Create button from an existing Spark JIRA ?
> e.g.
> https://issues.apache.org/jira/browse/SPARK-4352
>
> Once you're logged in, you should be able to select Spark as the Project.
>
> Cheers
>
> On Mon, Mar 7, 2016 at 2:54 AM, James Hammerton <ja...@gluru.co> wrote:
>
>> Hi,
>>
>> So I managed to isolate the bug and I'm ready to try raising a JIRA
>> issue. I joined the Apache Jira project so I can create tickets.
>>
>> However when I click Create from the Spark project home page on JIRA, it
>> asks me to click on one of the following service desks: Kylin, Atlas,
>> Ranger, Apache Infrastructure. There doesn't seem to be an option for me to
>> raise an issue for Spark?!
>>
>> Regards,
>>
>> James
>>
>>
>> On 4 March 2016 at 14:03, James Hammerton <ja...@gluru.co> wrote:
>>
>>> Sure thing, I'll see if I can isolate this.
>>>
>>> Regards.
>>>
>>> James
>>>
>>> On 4 March 2016 at 12:24, Ted Yu <yuzhih...@gmail.com> wrote:
>>>
>>>> If you can reproduce the following with a unit test, I suggest you open
>>>> a JIRA.
>>>>
>>>> Thanks
>>>>
>>>> On Mar 4, 2016, at 4:01 AM, James Hammerton <ja...@gluru.co> wrote:
>>>>
>>>> Hi,
>>>>
>>>> I've come across some strange behaviour with Spark 1.6.0.
>>>>
>>>> In the code below, the filtering by "eventName" only seems to work if I
>>>> called .cache on the resulting DataFrame.
>>>>
>>>> If I don't do this, the code crashes inside the UDF because it
>>>> processes an event that the filter should get rid off.
>>>>
>>>> Any ideas why this might be the case?
>>>>
>>>> The code is as follows:
>>>>
>>>>>   val df = sqlContext.read.parquet(inputPath)
>>>>>   val filtered = df.filter(df("eventName").equalTo(Created))
>>>>>   val extracted = extractEmailReferences(sqlContext,
>>>>> filtered.cache) // Caching seems to be required for the filter to work
>>>>>   extracted.write.parquet(outputPath)
>>>>
>>>>
>>>> where extractEmailReferences does this:
>>>>
>>>>>
>>>>
>>>> def extractEmailReferences(sqlContext: SQLContext, df: DataFrame):
>>>>> DataFrame = {
>>>>
>>>> val extracted = df.select(df(EventFieldNames.ObjectId),
>>>>
>>>>   extractReferencesUDF(df(EventFieldNames.EventJson),
>>>>> df(EventFieldNames.ObjectId), df(EventFieldNames.UserId)) as "references")
>>>>
>>>>
>>>>> extracted.filter(extracted("references").notEqual("UNKNOWN"))
>>>>
>>>>   }
>>>>
>>>>
>>>> and extractReferencesUDF:
>>>>
>>>>> def extractReferencesUDF = udf(extractReferences(_: String, _: String,
>>>>> _: String))
>>>>
>>>> def extractReferences(eventJson: String, objectId: String, userId:
>>>>> String): String = {
>>>>> import org.json4s.jackson.Serialization
>>>>> import org.json4s.NoTypeHints
>>>>> implicit val formats = Serialization.formats(NoTypeHints)
>>>>>
>>>>> val created = Serialization.read[GMailMessage.Created](eventJson)
>>>>> // This is where the code crashes if the .cache isn't called
>>>>
>>>>
>>>>  Regards,
>>>>
>>>> James
>>>>
>>>>
>>>
>>
>


Re: DataFrame .filter only seems to work when .cache is called in local mode in 1.6.0

2016-03-07 Thread James Hammerton
Hi,

So I managed to isolate the bug and I'm ready to try raising a JIRA issue.
I joined the Apache Jira project so I can create tickets.

However when I click Create from the Spark project home page on JIRA, it
asks me to click on one of the following service desks: Kylin, Atlas,
Ranger, Apache Infrastructure. There doesn't seem to be an option for me to
raise an issue for Spark?!

Regards,

James


On 4 March 2016 at 14:03, James Hammerton <ja...@gluru.co> wrote:

> Sure thing, I'll see if I can isolate this.
>
> Regards.
>
> James
>
> On 4 March 2016 at 12:24, Ted Yu <yuzhih...@gmail.com> wrote:
>
>> If you can reproduce the following with a unit test, I suggest you open a
>> JIRA.
>>
>> Thanks
>>
>> On Mar 4, 2016, at 4:01 AM, James Hammerton <ja...@gluru.co> wrote:
>>
>> Hi,
>>
>> I've come across some strange behaviour with Spark 1.6.0.
>>
>> In the code below, the filtering by "eventName" only seems to work if I
>> called .cache on the resulting DataFrame.
>>
>> If I don't do this, the code crashes inside the UDF because it processes
>> an event that the filter should get rid off.
>>
>> Any ideas why this might be the case?
>>
>> The code is as follows:
>>
>>>   val df = sqlContext.read.parquet(inputPath)
>>>   val filtered = df.filter(df("eventName").equalTo(Created))
>>>   val extracted = extractEmailReferences(sqlContext, filtered.cache)
>>> // Caching seems to be required for the filter to work
>>>   extracted.write.parquet(outputPath)
>>
>>
>> where extractEmailReferences does this:
>>
>>>
>>
>> def extractEmailReferences(sqlContext: SQLContext, df: DataFrame):
>>> DataFrame = {
>>
>> val extracted = df.select(df(EventFieldNames.ObjectId),
>>
>>   extractReferencesUDF(df(EventFieldNames.EventJson),
>>> df(EventFieldNames.ObjectId), df(EventFieldNames.UserId)) as "references")
>>
>>
>>> extracted.filter(extracted("references").notEqual("UNKNOWN"))
>>
>>   }
>>
>>
>> and extractReferencesUDF:
>>
>>> def extractReferencesUDF = udf(extractReferences(_: String, _: String,
>>> _: String))
>>
>> def extractReferences(eventJson: String, objectId: String, userId:
>>> String): String = {
>>> import org.json4s.jackson.Serialization
>>> import org.json4s.NoTypeHints
>>> implicit val formats = Serialization.formats(NoTypeHints)
>>>
>>> val created = Serialization.read[GMailMessage.Created](eventJson) //
>>> This is where the code crashes if the .cache isn't called
>>
>>
>>  Regards,
>>
>> James
>>
>>
>


Spark reduce serialization question

2016-03-04 Thread James Jia
I'm running a distributed KMeans algorithm with 4 executors.

I have a RDD[Data]. I use mapPartition to run a learner on each data
partition, and then call reduce with my custom model reduce function
to reduce the result of the model to start a new iteration.

The model size is around ~330 MB. I would expect that the required
memory for the serialized result at the driver to be at most 2*300 MB
in order to reduce two models, but it looks like Spark is serializing
all of the models to the driver before reducing.

The error message says that the total size of the serialized results
is 1345.5MB, which is approximately 4 * 330 MB. I know I can set the
driver's max result size, but I just want to confirm that this is
expected behavior.

Thanks!

James

Stage 0:==>(1
+ 3) / 4]16/02/19 05:59:28 ERROR TaskSetManager: Total size of
serialized results of 4 tasks (1345.5 MB) is bigger than
spark.driver.maxResultSize (1024.0 MB)

org.apache.spark.SparkException: Job aborted due to stage failure:
Total size of serialized results of 4 tasks (1345.5 MB) is bigger than
spark.driver.maxResultSize (1024.0 MB)

  at org.apache.spark.scheduler.DAGScheduler.org
<http://org.apache.spark.scheduler.dagscheduler.org/>$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1283)

  at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1271)

  at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1270)

  at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)

  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)

  at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1270)

  at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)

  at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)

  at scala.Option.foreach(Option.scala:257)

  at 
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697)

  at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1496)

  at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1458)

  at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1447)

  at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

  at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567)

  at org.apache.spark.SparkContext.runJob(SparkContext.scala:1824)

  at org.apache.spark.SparkContext.runJob(SparkContext.scala:1944)

  at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:1007)

  at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)

  at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)

  at org.apache.spark.rdd.RDD.withScope(RDD.scala:310)

  at org.apache.spark.rdd.RDD.reduce(RDD.scala:989)

  at BIDMach.RunOnSpark$.runOnSpark(RunOnSpark.scala:50)

  ... 50 elided


Re: DataFrame .filter only seems to work when .cache is called in local mode in 1.6.0

2016-03-04 Thread James Hammerton
Sure thing, I'll see if I can isolate this.

Regards.

James

On 4 March 2016 at 12:24, Ted Yu <yuzhih...@gmail.com> wrote:

> If you can reproduce the following with a unit test, I suggest you open a
> JIRA.
>
> Thanks
>
> On Mar 4, 2016, at 4:01 AM, James Hammerton <ja...@gluru.co> wrote:
>
> Hi,
>
> I've come across some strange behaviour with Spark 1.6.0.
>
> In the code below, the filtering by "eventName" only seems to work if I
> called .cache on the resulting DataFrame.
>
> If I don't do this, the code crashes inside the UDF because it processes
> an event that the filter should get rid off.
>
> Any ideas why this might be the case?
>
> The code is as follows:
>
>>   val df = sqlContext.read.parquet(inputPath)
>>   val filtered = df.filter(df("eventName").equalTo(Created))
>>   val extracted = extractEmailReferences(sqlContext, filtered.cache)
>> // Caching seems to be required for the filter to work
>>   extracted.write.parquet(outputPath)
>
>
> where extractEmailReferences does this:
>
>>
>
> def extractEmailReferences(sqlContext: SQLContext, df: DataFrame):
>> DataFrame = {
>
> val extracted = df.select(df(EventFieldNames.ObjectId),
>
>   extractReferencesUDF(df(EventFieldNames.EventJson),
>> df(EventFieldNames.ObjectId), df(EventFieldNames.UserId)) as "references")
>
>
>> extracted.filter(extracted("references").notEqual("UNKNOWN"))
>
>   }
>
>
> and extractReferencesUDF:
>
>> def extractReferencesUDF = udf(extractReferences(_: String, _: String, _:
>> String))
>
> def extractReferences(eventJson: String, objectId: String, userId:
>> String): String = {
>> import org.json4s.jackson.Serialization
>> import org.json4s.NoTypeHints
>> implicit val formats = Serialization.formats(NoTypeHints)
>>
>> val created = Serialization.read[GMailMessage.Created](eventJson) //
>> This is where the code crashes if the .cache isn't called
>
>
>  Regards,
>
> James
>
>


Re: How to control the number of parquet files getting created under a partition ?

2016-03-02 Thread James Hammerton
Hi,

Based on the behaviour I've seen using parquet, the number of partitions in
the DataFrame will determine the number of files in each parquet partition.

I.e. when you use "PARTITION BY" you're actually partitioning twice, once
via the partitions spark has created internally and then again with the
partitions you specify in the "PARTITION BY" clause.

So if you have 10 partitions in your DataFrame, and save that as a parquet
file or table partitioned on a column with 3 values, you'll get 30
partitions, 10 per parquet partition.

You can reduce the number of partitions in the DataFrame by using
coalesce() before saving the data.

Regards,

James


On 1 March 2016 at 21:01, SRK <swethakasire...@gmail.com> wrote:

> Hi,
>
> How can I control the number of parquet files getting created under a
> partition? I have my sqlContext queries to create a table and insert the
> records as follows. It seems to create around 250 parquet files under each
> partition though I was expecting that to create around 2 or 3 files. Due to
> the large number of files, it takes a lot of time to scan the records. Any
> suggestions as to how to control the number of parquet files under each
> partition would be of great help.
>
>  sqlContext.sql("  CREATE EXTERNAL TABLE IF NOT EXISTS testUserDts
> (userId STRING, savedDate STRING) PARTITIONED BY (partitioner STRING)
> stored as PARQUET LOCATION '/user/testId/testUserDts' ")
>
>   sqlContext.sql(
> """from testUserDtsTemp ps   insert overwrite table testUserDts
> partition(partitioner)  select ps.userId, ps.savedDate ,  ps.partitioner
> """.stripMargin)
>
>
>
> Thanks!
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-control-the-number-of-parquet-files-getting-created-under-a-partition-tp26374.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Sample sql query using pyspark

2016-03-01 Thread James Barney
Maurin,

I don't know the technical reason why but: try removing the 'limit 100'
part of your query. I was trying to do something similar the other week and
what I found is that each executor doesn't necessarily get the same 100
rows. Joins would fail or result with a bunch of nulls when keys weren't
found between the slices of 100 rows.

Once I removed the 'limit ' part of my query, all the results were the
same across the board and taking samples worked again.

If the amount of data is too large, or you're trying to just test on a
smaller size, just define another table and insert only 100 rows into that
table.

I hope that helps!

On Tue, Mar 1, 2016 at 3:10 AM, Maurin Lenglart 
wrote:

> Hi,
> I am trying to get a sample of a sql query in to make the query run
> faster.
> My query look like this :
> SELECT `Category` as `Category`,sum(`bookings`) as
> `bookings`,sum(`dealviews`) as `dealviews` FROM groupon_dropbox WHERE
>  `event_date` >= '2015-11-14' AND `event_date` <= '2016-02-19' GROUP BY
> `Category` LIMIT 100
>
> The table is partitioned by event_date. And the code I am using is:
>  df = self.df_from_sql(sql, srcs)
>
> results = df.sample(False, 0.5).collect()
>
>  The results are a little bit different, but the execution time is almost the 
> same. Am I missing something?
>
>
> thanks
>
>


Re: How could I do this algorithm in Spark?

2016-02-24 Thread James Barney
Guillermo,
I think you're after an associative algorithm where A is ultimately
associated with D, correct? Jakob would correct if that is a typo--a sort
would be all that is necessary in that case.

I believe you're looking for something else though, if I understand
correctly.

This seems like a similar algorithm to PageRank, no?
https://github.com/amplab/graphx/blob/master/python/examples/pagerank.py
Except return the "neighbor" itself, not the necessarily the rank of the
page.

If you wanted to, use Scala and Graphx for this problem. Might be a bit of
overhead though: Construct a node for each member of each tuple with an
edge between. Then traverse the graph for all sets of nodes that are
connected. That result set would quickly explode in size, but you could
restrict results to a minimum N connections. I'm not super familiar with
Graphx myself, however. My intuition is saying 'graph problem' though.

Thoughts?


On Wed, Feb 24, 2016 at 6:43 PM, Jakob Odersky  wrote:

> Hi Guillermo,
> assuming that the first "a,b" is a typo and you actually meant "a,d",
> this is a sorting problem.
>
> You could easily model your data as an RDD or tuples (or as a
> dataframe/set) and use the sortBy (or orderBy for dataframe/sets)
> methods.
>
> best,
> --Jakob
>
> On Wed, Feb 24, 2016 at 2:26 PM, Guillermo Ortiz 
> wrote:
> > I want to do some algorithm in Spark.. I know how to do it in a single
> > machine where all data are together, but I don't know a good way to do
> it in
> > Spark.
> >
> > If someone has an idea..
> > I have some data like this
> > a , b
> > x , y
> > b , c
> > y , y
> > c , d
> >
> > I want something like:
> > a , d
> > b , d
> > c , d
> > x , y
> > y , y
> >
> > I need to know that a->b->c->d, so a->d, b->d and c->d.
> > I don't want the code, just an idea how I could deal with it.
> >
> > Any idea?
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Count job stalling at shuffle stage on 3.4TB input (but only 5.3GB shuffle write)

2016-02-23 Thread James Hammerton
Hi,

I have been having problems processing a 3.4TB data set - uncompressed tab
separated text - containing object creation/update events from our system,
one event per line.

I decided to see what happens with a count of the number of events (=
number of lines in the text files) and a count of the number of distinct
object ids, which I thought should be straightforward enough to succeed.

The job stalled at the end of the first stage (55657 tasks, albeit 1 failed
but I've seen processing continue to the next stage despite small numbers
of failures) despite only generating a 5.3GB shuffle. It ran for 2.5 hours
and is now sitting apparently doing nothing.

Does this suggest something is wrong with the cluster? Computing either
event count should be straightforward despite the size of the data set, or
am I missing something?

The set up is a spark-ec2 generated cluster (trying EMR will be my next
move, along with bucketing the data via parquet)  running Spark 1.5.2,
openjdk 8 (this is a scala job though, but others are java), r3.2xlarge
instance types, 5 slaves each with 500GB EBS volumes which SPARK_LOCAL_DIRS
points to.

The code is:

val sc = new SparkContext(conf);
> try {
>   val rawSchema = StructType(Array(
> StructField("objectId", DataTypes.StringType, true),
> StructField("eventName", DataTypes.StringType, true),
> StructField("eventJson", DataTypes.StringType, true),
> StructField("timestampNanos", DataTypes.StringType, true)))
>   val sqlContext = new SQLContext(sc)
>   val df = sqlContext.read
> .format("com.databricks.spark.csv")
> .option("header", "false")
> .option("delimiter", "\t")
> .schema(rawSchema)
> .load(inputPath)
>   val oids = df.select("objectId")
>   val distinct = oids.distinct.count
>   val events = oids.count
>   println("Number of objectIds: " + distinct);
>   println("Number of events: " + events);
>   println("Elapsed time: " + (System.currentTimeMillis() -
> startMillis)/1000 + "s")


Here's the plan as revealed by the SQL part of the UI:

== Parsed Logical Plan ==
> Aggregate [count(1) AS count#4L]
>  Aggregate [objectId#0], [objectId#0]
>   Project [objectId#0]
>Relation[objectId#0,eventName#1,eventJson#2,timestampNanos#3] 
> CsvRelation(,Some(s3n://gluru-research/data/events.prod.2016-02-04/extractedEventsUncompressed),false,
> 
> ,",null,#,PERMISSIVE,COMMONS,false,false,false,StructType(StructField(objectId,StringType,true),
>  StructField(eventName,StringType,true), 
> StructField(eventJson,StringType,true), 
> StructField(timestampNanos,StringType,true)),false,null)
>
> == Analyzed Logical Plan ==
> count: bigint
> Aggregate [count(1) AS count#4L]
>  Aggregate [objectId#0], [objectId#0]
>   Project [objectId#0]
>Relation[objectId#0,eventName#1,eventJson#2,timestampNanos#3] 
> CsvRelation(,Some(s3n://gluru-research/data/events.prod.2016-02-04/extractedEventsUncompressed),false,
> 
> ,",null,#,PERMISSIVE,COMMONS,false,false,false,StructType(StructField(objectId,StringType,true),
>  StructField(eventName,StringType,true), 
> StructField(eventJson,StringType,true), 
> StructField(timestampNanos,StringType,true)),false,null)
>
> == Optimized Logical Plan ==
> Aggregate [count(1) AS count#4L]
>  Aggregate [objectId#0]
>   Project [objectId#0]
>Relation[objectId#0,eventName#1,eventJson#2,timestampNanos#3] 
> CsvRelation(,Some(s3n://gluru-research/data/events.prod.2016-02-04/extractedEventsUncompressed),false,
> 
> ,",null,#,PERMISSIVE,COMMONS,false,false,false,StructType(StructField(objectId,StringType,true),
>  StructField(eventName,StringType,true), 
> StructField(eventJson,StringType,true), 
> StructField(timestampNanos,StringType,true)),false,null)
>
> == Physical Plan ==
> TungstenAggregate(key=[], functions=[(count(1),mode=Final,isDistinct=false)], 
> output=[count#4L])
>  TungstenExchange SinglePartition
>   TungstenAggregate(key=[], 
> functions=[(count(1),mode=Partial,isDistinct=false)], 
> output=[currentCount#7L])
>TungstenAggregate(key=[objectId#0], functions=[], output=[])
> TungstenExchange hashpartitioning(objectId#0)
>  TungstenAggregate(key=[objectId#0], functions=[], output=[objectId#0])
>   Scan 
> CsvRelation(,Some(s3n://gluru-research/data/events.prod.2016-02-04/extractedEventsUncompressed),false,
>   
> ,",null,#,PERMISSIVE,COMMONS,false,false,false,StructType(StructField(objectId,StringType,true),
>  StructField(eventName,StringType,true), 
> StructField(eventJson,StringType,true), 
> StructField(timestampNanos,StringType,true)),false,null)[objectId#0]
>
> Code Generation: true
>
>
Regards,

James


Re: Is this likely to cause any problems?

2016-02-19 Thread James Hammerton
Hi,

Having looked at how easy it is to use EMR, I reckon you may be right,
especially if using Java 8 is no more difficult with that than with
spark-ec2 (where I had to install it on the master and slaves and edit the
spark-env.sh).

I'm now curious as to why the Spark documentation (
http://spark.apache.org/docs/latest/index.html) mentions EC2 but not EMR.

Regards,

James


On 19 February 2016 at 14:25, Daniel Siegmann <daniel.siegm...@teamaol.com>
wrote:

> With EMR supporting Spark, I don't see much reason to use the spark-ec2
> script unless it is important for you to be able to launch clusters using
> the bleeding edge version of Spark. EMR does seem to do a pretty decent job
> of keeping up to date - the latest version (4.3.0) supports the latest
> Spark version (1.6.0).
>
> So I'd flip the question around and ask: is there any reason to continue
> using the spark-ec2 script rather than EMR?
>
> On Thu, Feb 18, 2016 at 11:39 AM, James Hammerton <ja...@gluru.co> wrote:
>
>> I have now... So far  I think the issues I've had are not related to
>> this, but I wanted to be sure in case it should be something that needs to
>> be patched. I've had some jobs run successfully but this warning appears in
>> the logs.
>>
>> Regards,
>>
>> James
>>
>> On 18 February 2016 at 12:23, Ted Yu <yuzhih...@gmail.com> wrote:
>>
>>> Have you seen this ?
>>>
>>> HADOOP-10988
>>>
>>> Cheers
>>>
>>> On Thu, Feb 18, 2016 at 3:39 AM, James Hammerton <ja...@gluru.co> wrote:
>>>
>>>> HI,
>>>>
>>>> I am seeing warnings like this in the logs when I run Spark jobs:
>>>>
>>>> OpenJDK 64-Bit Server VM warning: You have loaded library 
>>>> /root/ephemeral-hdfs/lib/native/libhadoop.so.1.0.0 which might have 
>>>> disabled stack guard. The VM will try to fix the stack guard now.
>>>> It's highly recommended that you fix the library with 'execstack -c 
>>>> ', or link it with '-z noexecstack'.
>>>>
>>>>
>>>> I used spark-ec2 to launch the cluster with the default AMI, Spark
>>>> 1.5.2, hadoop major version 2.4. I altered the jdk to be openjdk 8 as I'd
>>>> written some jobs in Java 8. The 6 workers nodes are m4.2xlarge and master
>>>> is m4.large.
>>>>
>>>> Could this contribute to any problems running the jobs?
>>>>
>>>> Regards,
>>>>
>>>> James
>>>>
>>>
>>>
>>
>


Re: Is this likely to cause any problems?

2016-02-18 Thread James Hammerton
I have now... So far  I think the issues I've had are not related to this,
but I wanted to be sure in case it should be something that needs to be
patched. I've had some jobs run successfully but this warning appears in
the logs.

Regards,

James

On 18 February 2016 at 12:23, Ted Yu <yuzhih...@gmail.com> wrote:

> Have you seen this ?
>
> HADOOP-10988
>
> Cheers
>
> On Thu, Feb 18, 2016 at 3:39 AM, James Hammerton <ja...@gluru.co> wrote:
>
>> HI,
>>
>> I am seeing warnings like this in the logs when I run Spark jobs:
>>
>> OpenJDK 64-Bit Server VM warning: You have loaded library 
>> /root/ephemeral-hdfs/lib/native/libhadoop.so.1.0.0 which might have disabled 
>> stack guard. The VM will try to fix the stack guard now.
>> It's highly recommended that you fix the library with 'execstack -c 
>> ', or link it with '-z noexecstack'.
>>
>>
>> I used spark-ec2 to launch the cluster with the default AMI, Spark 1.5.2,
>> hadoop major version 2.4. I altered the jdk to be openjdk 8 as I'd written
>> some jobs in Java 8. The 6 workers nodes are m4.2xlarge and master is
>> m4.large.
>>
>> Could this contribute to any problems running the jobs?
>>
>> Regards,
>>
>> James
>>
>
>


Re: Is this likely to cause any problems?

2016-02-18 Thread James Hammerton
I'm fairly new to Spark.

The documentation suggests using the spark-ec2 script to launch clusters in
AWS, hence I used it.

Would EMR offer any advantage?

Regards,

James


On 18 February 2016 at 14:04, Gourav Sengupta <gourav.sengu...@gmail.com>
wrote:

> Hi,
>
> Just out of sheet curiosity why are you not using EMR to start your SPARK
> cluster?
>
>
> Regards,
> Gourav
>
> On Thu, Feb 18, 2016 at 12:23 PM, Ted Yu <yuzhih...@gmail.com> wrote:
>
>> Have you seen this ?
>>
>> HADOOP-10988
>>
>> Cheers
>>
>> On Thu, Feb 18, 2016 at 3:39 AM, James Hammerton <ja...@gluru.co> wrote:
>>
>>> HI,
>>>
>>> I am seeing warnings like this in the logs when I run Spark jobs:
>>>
>>> OpenJDK 64-Bit Server VM warning: You have loaded library 
>>> /root/ephemeral-hdfs/lib/native/libhadoop.so.1.0.0 which might have 
>>> disabled stack guard. The VM will try to fix the stack guard now.
>>> It's highly recommended that you fix the library with 'execstack -c 
>>> ', or link it with '-z noexecstack'.
>>>
>>>
>>> I used spark-ec2 to launch the cluster with the default AMI, Spark
>>> 1.5.2, hadoop major version 2.4. I altered the jdk to be openjdk 8 as I'd
>>> written some jobs in Java 8. The 6 workers nodes are m4.2xlarge and master
>>> is m4.large.
>>>
>>> Could this contribute to any problems running the jobs?
>>>
>>> Regards,
>>>
>>> James
>>>
>>
>>
>


Is this likely to cause any problems?

2016-02-18 Thread James Hammerton
HI,

I am seeing warnings like this in the logs when I run Spark jobs:

OpenJDK 64-Bit Server VM warning: You have loaded library
/root/ephemeral-hdfs/lib/native/libhadoop.so.1.0.0 which might have
disabled stack guard. The VM will try to fix the stack guard now.
It's highly recommended that you fix the library with 'execstack -c
', or link it with '-z noexecstack'.


I used spark-ec2 to launch the cluster with the default AMI, Spark 1.5.2,
hadoop major version 2.4. I altered the jdk to be openjdk 8 as I'd written
some jobs in Java 8. The 6 workers nodes are m4.2xlarge and master is
m4.large.

Could this contribute to any problems running the jobs?

Regards,

James


pyspark.DataFrame.dropDuplicates

2016-02-12 Thread James Barney
Hi all,
Just wondering what the actual logic governing DataFrame.dropDuplicates()
is? For example:

>>> from pyspark.sql import Row >>> df = sc.parallelize([ \ Row(name='Alice',
age=5, height=80, itemsInPocket=[pen, pencil, paper]), \
Row(name='Alice', age=5, height=80), itemsInPocket=[pen, pencil, paper])\
Row(name='Alice', age=10, height=80, itemsInPocket=[pen, pencil])]).toDF()
>>> df.dropDuplicates().show() +---+--+-+ |age|height| name|
itemsInPocket +---+--+-+ - | 5| 80|Alice| [pen, pencil,
paper] | 10| 80|Alice| [pen, pencil] +---+--+-+ - >>>
df.dropDuplicates(['name', 'height']).show() +---+--+-+ |age|height|
name| itemsInPocket +---+--+-+  | 5| 80|Alice| [pen,
pencil, paper] +---+--+-+
What determines which row is kept and which is deleted? First to appear? Or
random?

I would like to guarantee that the row with the longest list itemsInPocket
is kept. How can I do that?

Thanks,

James


Re: Extract all the values from describe

2016-02-08 Thread James Barney
Hi Arunkumar,
>From the scala documentation it's recommended to use the agg function for
performing any actual statistics programmatically on your data.
df.describe() is meant only for data exploration.

See Aggregator here:
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.Aggregator



On Mon, Feb 8, 2016 at 5:24 AM, Arunkumar Pillai 
wrote:

> hi
>
>
> I have  a dataframe df and  i use df.decribe() to get the stats value. but
> not able to parse and extract all the individual information. Please help
>
> --
> Thanks and Regards
> Arun
>


Recursive nested wildcard directory walking in Spark

2015-12-09 Thread James Ding
Hi!

My name is James, and I’m working on a question there doesn’t seem to be a
lot of answers about online. I was hoping spark/hadoop gurus could shed some
light on this.

I have a data feed on NFS that looks like /foobar/.gz
Currently I have a spark scala job that calls
sparkContext.textFile("/foo/*/*/*/bar/*.gz")
Upstream owners for the data feed have told me they may add additional
nested directories or remove them from files relevant to me. In other words,
files relevant to my spark job might sit on paths that look like:
* /foo/a/b/c/d/bar/*.gz
* /foo/a/b/bar/*.gz
They will do this with only some files and without warning. Anyone have
ideas on how I can configure spark to create an RDD from any textfiles that
fit the pattern /foo/**/bar/*.gz where ** represents a variable number of
wildcard directories?
I'm working with on order of 10^5 and 10^6 files which has discouraged me
from using something besides Hadoop fs API to walk the filesystem and feed
that input to my spark job, but I'm open to suggestions here also.
Thanks!
James Ding




smime.p7s
Description: S/MIME cryptographic signature


Re: Recursive nested wildcard directory walking in Spark

2015-12-09 Thread James Ding
I’ve set “mapreduce.input.fileinputformat.input.dir.recursive” to “true” in
the SparkConf I use to instantiate SparkContext, and I confirm this at
runtime in my scala job to print out this property, but
sparkContext.textFile(“/foo/*/bar/*.gz”) still fails (so do /foo/**/bar/*.gz
and /foo/*/*/bar/*.gz).

Any thoughts or workarounds? I’m considering using bash globbing to match
files recursively and feed hundreds of thousands of arguments to
spark-submit. Reasons for/against?

From:  Ted Yu <yuzhih...@gmail.com>
Date:  Wednesday, December 9, 2015 at 3:50 PM
To:  James Ding <jd...@palantir.com>
Cc:  "user@spark.apache.org" <user@spark.apache.org>
Subject:  Re: Recursive nested wildcard directory walking in Spark

Have you seen this thread ?

http://search-hadoop.com/m/q3RTt2uhMX1UhnCc1=Re+Does+sc+newAPIHadoopFil
e+support+multiple+directories+or+nested+directories+
<https://urldefense.proofpoint.com/v2/url?u=http-3A__search-2Dhadoop.com_m_q
3RTt2uhMX1UhnCc1-26subj-3DRe-2BDoes-2Bsc-2BnewAPIHadoopFile-2Bsupport-2Bmult
iple-2Bdirectories-2Bor-2Bnested-2Bdirectories-2B=CwMFaQ=izlc9mHr637UR4l
pLEZLFFS3Vn2UXBrZ4tFb6oOnmz8=nX8GRkcN51t--NvYyCeLIgTrhCN2jV0M6wL5LyNggFg
=GGSXdv6Ymo7CCgd1WS1BuqPmIU9HOhQq2WE0fSnun88=2v9s1Rq7cK3MLQpdOGfnlAnzPPh9z
GR-9nsVgwOqMyw=> 

FYI

On Wed, Dec 9, 2015 at 11:18 AM, James Ding <jd...@palantir.com> wrote:
> Hi!
> 
> My name is James, and I’m working on a question there doesn’t seem to be a lot
> of answers about online. I was hoping spark/hadoop gurus could shed some light
> on this.
> 
> I have a data feed on NFS that looks like /foobar/.gz
> Currently I have a spark scala job that calls
> sparkContext.textFile("/foo/*/*/*/bar/*.gz")
> Upstream owners for the data feed have told me they may add additional nested
> directories or remove them from files relevant to me. In other words, files
> relevant to my spark job might sit on paths that look like:
> * /foo/a/b/c/d/bar/*.gz
> * /foo/a/b/bar/*.gz
> They will do this with only some files and without warning. Anyone have ideas
> on how I can configure spark to create an RDD from any textfiles that fit the
> pattern /foo/**/bar/*.gz where ** represents a variable number of wildcard
> directories?
> I'm working with on order of 10^5 and 10^6 files which has discouraged me from
> using something besides Hadoop fs API to walk the filesystem and feed that
> input to my spark job, but I'm open to suggestions here also.
> Thanks!
> James Ding





smime.p7s
Description: S/MIME cryptographic signature


Re: Setting executors per worker - Standalone

2015-09-29 Thread James Pirz
Thanks for your help.
You were correct about the memory settings. Previously I had following
config:

--executor-memory 8g --conf spark.executor.cores=1

Which was really conflicting, as in spark-env.sh I had:

export SPARK_WORKER_CORES=4
export SPARK_WORKER_MEMORY=8192m

So the memory budget per worker was not enough to launch several executors.
By switching to:

--executor-memory 2g --conf spark.executor.cores=1

Now I can see that on each machine I have one worker, with 4 executors.

Thanks again for your help.


On Tue, Sep 29, 2015 at 1:30 AM, Robin East <robin.e...@xense.co.uk> wrote:

> I’m currently testing this exact setup - it work for me using both —conf
> spark.exeuctors.cores=1 and —executor-cores 1. Do you have some memory
> settings that need to be adjusted as well? Or do you accidentally have
> —total-executor-cores set as well? You should be able to tell from looking
> at the environment tab on the Application UI
>
> ---
> Robin East
> *Spark GraphX in Action* Michael Malak and Robin East
> Manning Publications Co.
> http://www.manning.com/books/spark-graphx-in-action
>
>
>
>
>
> On 29 Sep 2015, at 04:47, James Pirz <james.p...@gmail.com> wrote:
>
> Thanks for your reply.
>
> Setting it as
>
> --conf spark.executor.cores=1
>
> when I start spark-shell (as an example application) indeed sets the
> number of cores per executor as 1 (which is 4 before), but I still have 1
> executor per worker. What I am really looking for is having 1 worker with 4
> executor (each with one core) per machine when I run my application. Based
> one the documentation it seems it is feasible, but it is not clear as how.
>
> Thanks.
>
> On Mon, Sep 28, 2015 at 8:46 PM, Jeff Zhang <zjf...@gmail.com> wrote:
>
>> use "--executor-cores 1" you will get 4 executors per worker since you
>> have 4 cores per worker
>>
>>
>>
>> On Tue, Sep 29, 2015 at 8:24 AM, James Pirz <james.p...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I am using speak 1.5 (standalone mode) on a cluster with 10 nodes while
>>> each machine has 12GB of RAM and 4 cores. On each machine I have one worker
>>> which is running one executor that grabs all 4 cores. I am interested to
>>> check the performance with "one worker but 4 executors per machine - each
>>> with one core".
>>>
>>> I can see that "running multiple executors per worker in Standalone
>>> mode" is possible based on the closed issue:
>>>
>>> https://issues.apache.org/jira/browse/SPARK-1706
>>>
>>> But I can not find a way to do that. "SPARK_EXECUTOR_INSTANCES" is only
>>> available for the Yarn mode, and in the standalone mode I can just set
>>> "SPARK_WORKER_INSTANCES" and "SPARK_WORKER_CORES" and "SPARK_WORKER_MEMORY".
>>>
>>> Any hint or suggestion would be great.
>>>
>>>
>>
>>
>> --
>> Best Regards
>>
>> Jeff Zhang
>>
>
>
>


Re: Setting executors per worker - Standalone

2015-09-28 Thread James Pirz
Thanks for your reply.

Setting it as

--conf spark.executor.cores=1

when I start spark-shell (as an example application) indeed sets the number
of cores per executor as 1 (which is 4 before), but I still have 1 executor
per worker. What I am really looking for is having 1 worker with 4 executor
(each with one core) per machine when I run my application. Based one the
documentation it seems it is feasible, but it is not clear as how.

Thanks.

On Mon, Sep 28, 2015 at 8:46 PM, Jeff Zhang <zjf...@gmail.com> wrote:

> use "--executor-cores 1" you will get 4 executors per worker since you
> have 4 cores per worker
>
>
>
> On Tue, Sep 29, 2015 at 8:24 AM, James Pirz <james.p...@gmail.com> wrote:
>
>> Hi,
>>
>> I am using speak 1.5 (standalone mode) on a cluster with 10 nodes while
>> each machine has 12GB of RAM and 4 cores. On each machine I have one worker
>> which is running one executor that grabs all 4 cores. I am interested to
>> check the performance with "one worker but 4 executors per machine - each
>> with one core".
>>
>> I can see that "running multiple executors per worker in Standalone mode"
>> is possible based on the closed issue:
>>
>> https://issues.apache.org/jira/browse/SPARK-1706
>>
>> But I can not find a way to do that. "SPARK_EXECUTOR_INSTANCES" is only
>> available for the Yarn mode, and in the standalone mode I can just set
>> "SPARK_WORKER_INSTANCES" and "SPARK_WORKER_CORES" and "SPARK_WORKER_MEMORY".
>>
>> Any hint or suggestion would be great.
>>
>>
>
>
> --
> Best Regards
>
> Jeff Zhang
>


Setting executors per worker - Standalone

2015-09-28 Thread James Pirz
Hi,

I am using speak 1.5 (standalone mode) on a cluster with 10 nodes while
each machine has 12GB of RAM and 4 cores. On each machine I have one worker
which is running one executor that grabs all 4 cores. I am interested to
check the performance with "one worker but 4 executors per machine - each
with one core".

I can see that "running multiple executors per worker in Standalone mode"
is possible based on the closed issue:

https://issues.apache.org/jira/browse/SPARK-1706

But I can not find a way to do that. "SPARK_EXECUTOR_INSTANCES" is only
available for the Yarn mode, and in the standalone mode I can just set
"SPARK_WORKER_INSTANCES" and "SPARK_WORKER_CORES" and "SPARK_WORKER_MEMORY".

Any hint or suggestion would be great.


Unreachable dead objects permanently retained on heap

2015-09-25 Thread James Aley
Hi,

We have an application that submits several thousands jobs within the same
SparkContext, using a thread pool to run about 50 in parallel. We're
running on YARN using Spark 1.4.1 and seeing a problem where our driver is
killed by YARN due to running beyond physical memory limits (no Java OOM
stack trace though).

Plugging in YourKit, I can see that in fact the application is running low
on heap. The suspicious thing we're seeing is that the old generation is
filling up with dead objects, which don't seem to be fully removed during
the stop-the-world sweeps we see happening later in the running of the
application.

With allocation tracking enabled, I can see that maybe 80%+ of that dead
heap space consists of byte arrays, which appear to contain some
snappy-compressed Hadoop configuration data. Many of them are 4MB each,
other hundreds of KBs. The allocation tracking reveals that they were
originally allocated in calls to sparkContext.hadoopFile() (from
AvroRelation in spark-avro). It seems that this data was broadcast to the
executors as a result of that call? I'm not clear on the implementation
details, but I can imagine that might be necessary?

This application is essentially a batch job to take many Avro files and
merging them into larger Parquet files. What it does is builds a DataFrame
of Avro files, then for each DataFrame, starts a job using
.coalesce(N).write().parquet() on a fixed size thread pool.

It seems that for each of those calls, another chunk of heap space
disappears to one of these byte arrays and is never reclaimed. I understand
that broadcast variables remain in memory on the driver application in
their serialized form, and that at least appears to be consistent with what
I'm seeing here. Question is, what can we do about this? Is there a way to
reclaim this memory? Should those arrays be GC'ed when jobs finish?

Any guidance greatly appreciated.


Many thanks,

James.


Java UDFs in GROUP BY expressions

2015-09-07 Thread James Aley
Hi everyone,

I raised this JIRA ticket back in July:
https://issues.apache.org/jira/browse/SPARK-9435

The problem is that it seems Spark SQL doesn't recognise columns we
transform with a UDF when referenced in the GROUP BY clause. There's a
minimal reproduction Java file attached to illustrate the issue.

The equivalent code from Scala seems to work fine for me. Is anyone else
seeing this problem? For us, the attached code fails every time on Spark
1.4.1


Thanks,

James


RE: Feedback: Feature request

2015-08-28 Thread Murphy, James
This is great and much appreciated. Thank you.
- Jim

From: Manish Amde [mailto:manish...@gmail.com]
Sent: Friday, August 28, 2015 9:20 AM
To: Cody Koeninger
Cc: Murphy, James; user@spark.apache.org; d...@spark.apache.org
Subject: Re: Feedback: Feature request

Sounds good. It's a request I have seen a few times in the past and have needed 
it personally. May be Joseph Bradley has something to add.

I think a JIRA to capture this will be great. We can move this discussion to 
the JIRA then.

On Friday, August 28, 2015, Cody Koeninger 
c...@koeninger.orgmailto:c...@koeninger.org wrote:

I wrote some code for this a while back, pretty sure it didn't need access to 
anything private in the decision tree / random forest model.  If people want it 
added to the api I can put together a PR.

I think it's important to have separately parseable operators / operands 
though.  E.g

lhs:0,op:=,rhs:-35.0
On Aug 28, 2015 12:03 AM, Manish Amde 
manish...@gmail.comjavascript:_e(%7B%7D,'cvml','manish...@gmail.com'); 
wrote:
Hi James,

It's a good idea. A JSON format is more convenient for visualization though a 
little inconvenient to read. How about toJson() method? It might make the mllib 
api inconsistent across models though.

You should probably create a JIRA for this.

CC: dev list

-Manish

On Aug 26, 2015, at 11:29 AM, Murphy, James 
james.mur...@disney.comjavascript:_e(%7B%7D,'cvml','james.mur...@disney.com');
 wrote:
Hey all,

In working with the DecisionTree classifier, I found it difficult to extract 
rules that could easily facilitate visualization with libraries like D3.

So for example, using : print(model.toDebugString()), I get the following 
result =

   If (feature 0 = -35.0)
  If (feature 24 = 176.0)
Predict: 2.1
  If (feature 24 = 176.0)
Predict: 4.2
  Else (feature 24  176.0)
Predict: 6.3
Else (feature 0  -35.0)
  If (feature 24 = 11.0)
Predict: 4.5
  Else (feature 24  11.0)
Predict: 10.2

But ideally, I could see results in a more parseable format like JSON:


{

node: [

{

name:node1,

rule:feature 0 = -35.0,

children:[

{

  name:node2,

  rule:feature 24 = 176.0,

  children:[

  {

  name:node4,

  rule:feature 20  116.0,

  predict:  2.1

  },

  {

  name:node5,

  rule:feature 20 = 116.0,

  predict: 4.2

  },

  {

  name:node5,

  rule:feature 20  116.0,

  predict: 6.3

  }

  ]

},

{

name:node3,

rule:feature 0  -35.0,

  children:[

  {

  name:node7,

  rule:feature 3 = 11.0,

  predict: 4.5

  },

  {

  name:node8,

  rule:feature 3  11.0,

  predict: 10.2

  }

  ]

}



]

}

]

}

Food for thought!

Thanks,

Jim



Feedback: Feature request

2015-08-26 Thread Murphy, James
Hey all,

In working with the DecisionTree classifier, I found it difficult to extract 
rules that could easily facilitate visualization with libraries like D3.

So for example, using : print(model.toDebugString()), I get the following 
result =

   If (feature 0 = -35.0)
  If (feature 24 = 176.0)
Predict: 2.1
  If (feature 24 = 176.0)
Predict: 4.2
  Else (feature 24  176.0)
Predict: 6.3
Else (feature 0  -35.0)
  If (feature 24 = 11.0)
Predict: 4.5
  Else (feature 24  11.0)
Predict: 10.2

But ideally, I could see results in a more parseable format like JSON:


{

node: [

{

name:node1,

rule:feature 0 = -35.0,

children:[

{

  name:node2,

  rule:feature 24 = 176.0,

  children:[

  {

  name:node4,

  rule:feature 20  116.0,

  predict:  2.1

  },

  {

  name:node5,

  rule:feature 20 = 116.0,

  predict: 4.2

  },

  {

  name:node5,

  rule:feature 20  116.0,

  predict: 6.3

  }

  ]

},

{

name:node3,

rule:feature 0  -35.0,

  children:[

  {

  name:node7,

  rule:feature 3 = 11.0,

  predict: 4.5

  },

  {

  name:node8,

  rule:feature 3  11.0,

  predict: 10.2

  }

  ]

}



]

}

]

}

Food for thought!

Thanks,

Jim



Repartitioning external table in Spark sql

2015-08-18 Thread James Pirz
I am using Spark 1.4.1 , in stand-alone mode, on a cluster of 3 nodes.

Using Spark sql and Hive Context, I am trying to run a simple scan query on
an existing Hive table (which is an external table consisting of rows in
text files stored in HDFS - it is NOT parquet, ORC or any other richer
format).

DataFrame res = hiveCtx.sql(SELECT * FROM lineitem WHERE L_LINENUMBER 
0);

What I observe is the performance of this full scan in Spark is not
comparable with Hive (it is almost 4 times slower). Checking the resource
usage, what I see is workers/executors do not do parallel scans but they
scan on a per-node basis (first executors from the worker(s) on node 1 do
reading from disk, while other two nodes are not doing I/O and just receive
data from the first node and through network, then 2nd node does the scan
and then the third one).
I also realized that if I load this data file directly from my spark
context (using textFile() ) and run count() on that (not using spark sql)
then I can get a better performance by increasing number of partitions. I
am just trying to do the same thing (increasing number of partitions in the
beginning) in Spark sql as:

var tab = sqlContext.read.table(lineitem);
tab.repartition(1000);
OR
tab.coalesce(1000);

but none of repartition() or coalesce() methods actually work - they do not
return an error, but if I check

var p = tab.rdd.partitions.size;

before and after calling any of them, it returns the same number of
partitions.

I am just wondering how I can change the number of partitions for a Hive
external table, in Spark Sql.

Any help/suggestion would be appreciated.


Re: worker and executor memory

2015-08-14 Thread James Pirz
Additional Comment:
I checked the disk usage on the 3 nodes (using iostat) and it seems that
reading from HDFS partitions happen in a node-by-node basis. Only one of
the nodes shows active IO (as read) at any given time while the other two
nodes are idle IO-wise. I am not sure why the tasks are scheduled that way,
as it is a map-only job and reading can happen in parallel.

On Thu, Aug 13, 2015 at 9:10 PM, James Pirz james.p...@gmail.com wrote:

 Hi,

 I am using Spark 1.4 on a cluster (stand-alone mode), across 3 machines,
 for a workload similar to TPCH (analytical queries with multiple/multi-way
 large joins and aggregations). Each machine has 12GB of Memory and 4 cores.
 My total data size is 150GB, stored in HDFS (stored as Hive tables), and I
 am running my queries through Spark SQL using hive context.
 After checking the performance tuning documents on the spark page and some
 clips from latest spark summit, I decided to set the following configs in
 my spark-env:

 SPARK_WORKER_INSTANCES=4
 SPARK_WORKER_CORES=1
 SPARK_WORKER_MEMORY=2500M

 (As my tasks tend to be long so the overhead of starting multiple JVMs,
 one per worker is much less than the total query times). As I monitor the
 job progress, I realized that while the Worker memory is 2.5GB, the
 executors (one per worker) have max memory of 512MB (which is default). I
 enlarged this value in my application as:

 conf.set(spark.executor.memory, 2.5g);

 Trying to give max available memory on each worker to its only executor,
 but I observed that my queries are running slower than the prev case
 (default 512MB). Changing 2.5g to 1g improved the performance time, it is
 close to but still worse than 512MB case. I guess what I am missing here is
 what is the relationship between the WORKER_MEMORY and 'executor.memory'.

 - Isn't it the case that WORKER tries to split this memory among its
 executors (in my case its only executor) ? Or there are other stuff being
 done worker which need memory ?

 - What other important parameters I need to look into and tune at this
 point to get the best response time out of my HW ? (I have read about Kryo
 serializer, and I am about trying that - I am mainly concerned about memory
 related settings and also knobs related to parallelism of my jobs). As an
 example, for a simple scan-only query, Spark is worse than Hive (almost 3
 times slower) while both are scanning the exact same table  file format.
 That is why I believe I am missing some params by leaving them as defaults.

 Any hint/suggestion would be highly appreciated.





worker and executor memory

2015-08-13 Thread James Pirz
Hi,

I am using Spark 1.4 on a cluster (stand-alone mode), across 3 machines,
for a workload similar to TPCH (analytical queries with multiple/multi-way
large joins and aggregations). Each machine has 12GB of Memory and 4 cores.
My total data size is 150GB, stored in HDFS (stored as Hive tables), and I
am running my queries through Spark SQL using hive context.
After checking the performance tuning documents on the spark page and some
clips from latest spark summit, I decided to set the following configs in
my spark-env:

SPARK_WORKER_INSTANCES=4
SPARK_WORKER_CORES=1
SPARK_WORKER_MEMORY=2500M

(As my tasks tend to be long so the overhead of starting multiple JVMs, one
per worker is much less than the total query times). As I monitor the job
progress, I realized that while the Worker memory is 2.5GB, the executors
(one per worker) have max memory of 512MB (which is default). I enlarged
this value in my application as:

conf.set(spark.executor.memory, 2.5g);

Trying to give max available memory on each worker to its only executor,
but I observed that my queries are running slower than the prev case
(default 512MB). Changing 2.5g to 1g improved the performance time, it is
close to but still worse than 512MB case. I guess what I am missing here is
what is the relationship between the WORKER_MEMORY and 'executor.memory'.

- Isn't it the case that WORKER tries to split this memory among its
executors (in my case its only executor) ? Or there are other stuff being
done worker which need memory ?

- What other important parameters I need to look into and tune at this
point to get the best response time out of my HW ? (I have read about Kryo
serializer, and I am about trying that - I am mainly concerned about memory
related settings and also knobs related to parallelism of my jobs). As an
example, for a simple scan-only query, Spark is worse than Hive (almost 3
times slower) while both are scanning the exact same table  file format.
That is why I believe I am missing some params by leaving them as defaults.

Any hint/suggestion would be highly appreciated.


Re: SparkSQL: add jar blocks all queries

2015-08-07 Thread Wu, James C.
Hi,

The issue only seems to happen when trying to access spark via the SparkSQL 
Thrift Server interface.

Does anyone know a fix?

james

From: Wu, Walt Disney james.c...@disney.commailto:james.c...@disney.com
Date: Friday, August 7, 2015 at 12:40 PM
To: user@spark.apache.orgmailto:user@spark.apache.org 
user@spark.apache.orgmailto:user@spark.apache.org
Subject: SparkSQL: add jar blocks all queries

Hi,

I got into a situation where a prior add jar  command causing Spark SQL stops 
to work for all users.

Does anyone know how to fix the issue?

Regards,

james

From: Wu, Walt Disney james.c...@disney.commailto:james.c...@disney.com
Date: Friday, August 7, 2015 at 10:29 AM
To: user@spark.apache.orgmailto:user@spark.apache.org 
user@spark.apache.orgmailto:user@spark.apache.org
Subject: SparkSQL: remove jar added by add jar  command from dependencies

Hi,

I am using Spark SQL to run some queries on a set of avro data. Somehow I am 
getting this error

0: jdbc:hive2://n7-z01-0a2a1453 select count(*) from flume_test;

Error: org.apache.spark.SparkException: Job aborted due to stage failure: Task 
3 in stage 26.0 failed 4 times, most recent failure: Lost task 3.3 in stage 
26.0 (TID 1027, n7-z01-0a2a1457.iaas.starwave.com): java.io.IOException: 
Incomplete HDFS URI, no host: hdfs:data/hive-jars/avro-mapred.jar

at 
org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:141)

at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2596)

at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91)

at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2630)

at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2612)

at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:370)

at org.apache.spark.util.Utils$.getHadoopFileSystem(Utils.scala:1364)

at org.apache.spark.util.Utils$.doFetchFile(Utils.scala:498)

at org.apache.spark.util.Utils$.fetchFile(Utils.scala:383)

at 
org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$6.apply(Executor.scala:350)

at 
org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$6.apply(Executor.scala:347)

at 
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)

at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)

at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)

at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)

at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)

at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)

at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)

at 
org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$updateDependencies(Executor.scala:347)

at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)

at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

at java.lang.Thread.run(Thread.java:745)


I did not add the jar in this session, so I am wondering how I can get the jar 
removed from the dependencies so that It is not blocking all my spark sql 
queries for all sessions.

Thanks,

James


SparkSQL: remove jar added by add jar command from dependencies

2015-08-07 Thread Wu, James C.
Hi,

I am using Spark SQL to run some queries on a set of avro data. Somehow I am 
getting this error

0: jdbc:hive2://n7-z01-0a2a1453 select count(*) from flume_test;

Error: org.apache.spark.SparkException: Job aborted due to stage failure: Task 
3 in stage 26.0 failed 4 times, most recent failure: Lost task 3.3 in stage 
26.0 (TID 1027, n7-z01-0a2a1457.iaas.starwave.com): java.io.IOException: 
Incomplete HDFS URI, no host: hdfs:data/hive-jars/avro-mapred.jar

at 
org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:141)

at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2596)

at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91)

at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2630)

at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2612)

at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:370)

at org.apache.spark.util.Utils$.getHadoopFileSystem(Utils.scala:1364)

at org.apache.spark.util.Utils$.doFetchFile(Utils.scala:498)

at org.apache.spark.util.Utils$.fetchFile(Utils.scala:383)

at 
org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$6.apply(Executor.scala:350)

at 
org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$6.apply(Executor.scala:347)

at 
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)

at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)

at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)

at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)

at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)

at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)

at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)

at 
org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$updateDependencies(Executor.scala:347)

at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)

at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

at java.lang.Thread.run(Thread.java:745)


I did not add the jar in this session, so I am wondering how I can get the jar 
removed from the dependencies so that It is not blocking all my spark sql 
queries for all sessions.

Thanks,

James


SparkSQL: add jar blocks all queries

2015-08-07 Thread Wu, James C.
Hi,

I got into a situation where a prior add jar  command causing Spark SQL stops 
to work for all users.

Does anyone know how to fix the issue?

Regards,

james

From: Wu, Walt Disney james.c...@disney.commailto:james.c...@disney.com
Date: Friday, August 7, 2015 at 10:29 AM
To: user@spark.apache.orgmailto:user@spark.apache.org 
user@spark.apache.orgmailto:user@spark.apache.org
Subject: SparkSQL: remove jar added by add jar  command from dependencies

Hi,

I am using Spark SQL to run some queries on a set of avro data. Somehow I am 
getting this error

0: jdbc:hive2://n7-z01-0a2a1453 select count(*) from flume_test;

Error: org.apache.spark.SparkException: Job aborted due to stage failure: Task 
3 in stage 26.0 failed 4 times, most recent failure: Lost task 3.3 in stage 
26.0 (TID 1027, n7-z01-0a2a1457.iaas.starwave.com): java.io.IOException: 
Incomplete HDFS URI, no host: hdfs:data/hive-jars/avro-mapred.jar

at 
org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:141)

at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2596)

at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91)

at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2630)

at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2612)

at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:370)

at org.apache.spark.util.Utils$.getHadoopFileSystem(Utils.scala:1364)

at org.apache.spark.util.Utils$.doFetchFile(Utils.scala:498)

at org.apache.spark.util.Utils$.fetchFile(Utils.scala:383)

at 
org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$6.apply(Executor.scala:350)

at 
org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$6.apply(Executor.scala:347)

at 
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)

at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)

at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)

at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)

at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)

at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)

at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)

at 
org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$updateDependencies(Executor.scala:347)

at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)

at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

at java.lang.Thread.run(Thread.java:745)


I did not add the jar in this session, so I am wondering how I can get the jar 
removed from the dependencies so that It is not blocking all my spark sql 
queries for all sessions.

Thanks,

James


[POWERED BY] Please add our organization

2015-07-24 Thread Baxter, James
Hi there,
Details below.

Organisation: Woodside
URL: http://www.woodside.com.au/
Components: Spark Core 1.31/1/41 and Spark SQL
Use Case: Spark is being used for near real time predictive analysis over 
millions of equipment sensor readings and within our Data Integration processes 
for data quality analysis and data exploration.


Regards
James Baxter
Technology and Innovation Analyst
ISS
Woodside Energy Ltd.
Woodside Plaza
240 St Georges Terrace
Perth WA 6000
Australia
T: +61 8 9348 4218
F: +61 8 9348 6561
E: james.bax...@woodside.com.aumailto:james.bax...@woodside.com.au


NOTICE: This email and any attachments are confidential. 
They may contain legally privileged information or 
copyright material. You must not read, copy, use or 
disclose them without authorisation. If you are not an 
intended recipient, please contact us at once by return 
email and then delete both messages and all attachments.


Streaming: updating broadcast variables

2015-07-03 Thread James Cole
Hi all,

I'm filtering a DStream using a function. I need to be able to change this
function while the application is running (I'm polling a service to see if
a user has changed their filtering). The filter function is a
transformation and runs on the workers, so that's where the updates need to
go. I'm not sure of the best way to do this.

Initially broadcasting seemed like the way to go: the filter is actually
quite large. But I don't think I can update something I've broadcasted.
I've tried unpersisting and re-creating the broadcast variable but it
became obvious this wasn't updating the reference on the worker. So am I
correct in thinking I can't use broadcasted variables for this purpose?

The next option seems to be: stopping the JavaStreamingContext, creating a
new one from the SparkContext, updating the filter function, and
re-creating the DStreams (I'm using direct streams from Kafka).

If I re-created the JavaStreamingContext would the accumulators (which are
created from the SparkContext) keep working? (Obviously I'm going to try
this soon)

In summary:

1) Can broadcasted variables be updated?

2) Is there a better way than re-creating the JavaStreamingContext and
DStreams?

Thanks,

James


Re: Help optimising Spark SQL query

2015-06-30 Thread James Aley
Thanks everybody for the advice on this.

I attached YourKit and found that the CPU time split was about 70% in
Parquet/LZO reading and 30% applying the filter predicate. I guess those
are reasonable things for it to be spending time on, and so it really could
just be a case of needing more hardware to cope with that volume of rows.
That's not such a problem, as the cluster wasn't exactly huge when testing
- just a couple of nodes.

Further, we've not been making use of the partitioning support for Parquet
data, which would actually give us a simple way to control how much
historical data to go sifting through. Turns out we're already writing our
data as type/timestamp/parquet file, we just missed the date=
naming convention - d'oh! At least that means a fairly simple rename script
should get us out of trouble!

Appreciate everyone's tips, thanks again!

James.


On 23 June 2015 at 17:25, Sabarish Sasidharan 
sabarish.sasidha...@manthan.com wrote:

 64GB in parquet could be many billions of rows because of the columnar
 compression. And count distinct by itself is an expensive operation. This
 is not just on Spark, even on Presto/Impala, you would see performance dip
 with count distincts. And the cluster is not that powerful either.

 The one issue here is that Spark has to sift through all the data to get
 to just a week's worth. To achieve better performance you might want to
 partition the data by date/week and then Spark wouldn't have to sift
 through all the billions of rows to get to the millions it needs to
 aggregate.

 Regards
 Sab

 On Tue, Jun 23, 2015 at 4:35 PM, James Aley james.a...@swiftkey.com
 wrote:

 Thanks for the suggestions everyone, appreciate the advice.

 I tried replacing DISTINCT for the nested GROUP BY, running on 1.4
 instead of 1.3, replacing the date casts with a between operation on the
 corresponding long constants instead and changing COUNT(*) to COUNT(1).
 None of these seem to have made any remarkable difference in running time
 for the query.

 I'll hook up YourKit and see if we can figure out where the CPU time is
 going, then post back.

 On 22 June 2015 at 16:01, Yin Huai yh...@databricks.com wrote:

 Hi James,

 Maybe it's the DISTINCT causing the issue.

 I rewrote the query as follows. Maybe this one can finish faster.

 select
   sum(cnt) as uses,
   count(id) as users
 from (
   select
 count(*) cnt,
 cast(id as string) as id,
   from usage_events
   where
 from_unixtime(cast(timestamp_millis/1000 as bigint)) between
 '2015-06-09' and '2015-06-16'
   group by cast(id as string)
 ) tmp

 Thanks,

 Yin

 On Mon, Jun 22, 2015 at 12:55 PM, Jörn Franke jornfra...@gmail.com
 wrote:

 Generally (not only spark sql specific) you should not cast in the
 where part of a sql query. It is also not necessary in your case. Getting
 rid of casts in the whole query will be also beneficial.

 Le lun. 22 juin 2015 à 17:29, James Aley james.a...@swiftkey.com a
 écrit :

 Hello,

 A colleague of mine ran the following Spark SQL query:

 select
   count(*) as uses,
   count (distinct cast(id as string)) as users
 from usage_events
 where
   from_unixtime(cast(timestamp_millis/1000 as bigint))
 between '2015-06-09' and '2015-06-16'

 The table contains billions of rows, but totals only 64GB of data
 across ~30 separate files, which are stored as Parquet with LZO 
 compression
 in S3.

 From the referenced columns:

 * id is Binary, which we cast to a String so that we can DISTINCT by
 it. (I was already told this will improve in a later release, in a 
 separate
 thread.)
 * timestamp_millis is a long, containing a unix timestamp with
 millisecond resolution

 This took nearly 2 hours to run on a 5 node cluster of r3.xlarge EC2
 instances, using 20 executors, each with 4GB memory. I can see from
 monitoring tools that the CPU usage is at 100% on all nodes, but incoming
 network seems a bit low at 2.5MB/s, suggesting to me that this is 
 CPU-bound.

 Does that seem slow? Can anyone offer any ideas by glancing at the
 query as to why this might be slow? We'll profile it meanwhile and post
 back if we find anything ourselves.

 A side issue - I've found that this query, and others, sometimes
 completes but doesn't return any results. There appears to be no error 
 that
 I can see in the logs, and Spark reports the job as successful, but the
 connected JDBC client (SQLWorkbenchJ in this case), just sits there 
 forever
 waiting. I did a quick Google and couldn't find anyone else having similar
 issues.


 Many thanks,

 James.






 --

 Architect - Big Data
 Ph: +91 99805 99458

 Manthan Systems | *Company of the year - Analytics (2014 Frost and
 Sullivan India ICT)*
 +++



Re: Help optimising Spark SQL query

2015-06-23 Thread James Aley
Thanks for the suggestions everyone, appreciate the advice.

I tried replacing DISTINCT for the nested GROUP BY, running on 1.4 instead
of 1.3, replacing the date casts with a between operation on the
corresponding long constants instead and changing COUNT(*) to COUNT(1).
None of these seem to have made any remarkable difference in running time
for the query.

I'll hook up YourKit and see if we can figure out where the CPU time is
going, then post back.

On 22 June 2015 at 16:01, Yin Huai yh...@databricks.com wrote:

 Hi James,

 Maybe it's the DISTINCT causing the issue.

 I rewrote the query as follows. Maybe this one can finish faster.

 select
   sum(cnt) as uses,
   count(id) as users
 from (
   select
 count(*) cnt,
 cast(id as string) as id,
   from usage_events
   where
 from_unixtime(cast(timestamp_millis/1000 as bigint)) between
 '2015-06-09' and '2015-06-16'
   group by cast(id as string)
 ) tmp

 Thanks,

 Yin

 On Mon, Jun 22, 2015 at 12:55 PM, Jörn Franke jornfra...@gmail.com
 wrote:

 Generally (not only spark sql specific) you should not cast in the where
 part of a sql query. It is also not necessary in your case. Getting rid of
 casts in the whole query will be also beneficial.

 Le lun. 22 juin 2015 à 17:29, James Aley james.a...@swiftkey.com a
 écrit :

 Hello,

 A colleague of mine ran the following Spark SQL query:

 select
   count(*) as uses,
   count (distinct cast(id as string)) as users
 from usage_events
 where
   from_unixtime(cast(timestamp_millis/1000 as bigint))
 between '2015-06-09' and '2015-06-16'

 The table contains billions of rows, but totals only 64GB of data across
 ~30 separate files, which are stored as Parquet with LZO compression in S3.

 From the referenced columns:

 * id is Binary, which we cast to a String so that we can DISTINCT by
 it. (I was already told this will improve in a later release, in a separate
 thread.)
 * timestamp_millis is a long, containing a unix timestamp with
 millisecond resolution

 This took nearly 2 hours to run on a 5 node cluster of r3.xlarge EC2
 instances, using 20 executors, each with 4GB memory. I can see from
 monitoring tools that the CPU usage is at 100% on all nodes, but incoming
 network seems a bit low at 2.5MB/s, suggesting to me that this is CPU-bound.

 Does that seem slow? Can anyone offer any ideas by glancing at the query
 as to why this might be slow? We'll profile it meanwhile and post back if
 we find anything ourselves.

 A side issue - I've found that this query, and others, sometimes
 completes but doesn't return any results. There appears to be no error that
 I can see in the logs, and Spark reports the job as successful, but the
 connected JDBC client (SQLWorkbenchJ in this case), just sits there forever
 waiting. I did a quick Google and couldn't find anyone else having similar
 issues.


 Many thanks,

 James.





Help optimising Spark SQL query

2015-06-22 Thread James Aley
Hello,

A colleague of mine ran the following Spark SQL query:

select
  count(*) as uses,
  count (distinct cast(id as string)) as users
from usage_events
where
  from_unixtime(cast(timestamp_millis/1000 as bigint))
between '2015-06-09' and '2015-06-16'

The table contains billions of rows, but totals only 64GB of data across
~30 separate files, which are stored as Parquet with LZO compression in S3.

From the referenced columns:

* id is Binary, which we cast to a String so that we can DISTINCT by it. (I
was already told this will improve in a later release, in a separate
thread.)
* timestamp_millis is a long, containing a unix timestamp with millisecond
resolution

This took nearly 2 hours to run on a 5 node cluster of r3.xlarge EC2
instances, using 20 executors, each with 4GB memory. I can see from
monitoring tools that the CPU usage is at 100% on all nodes, but incoming
network seems a bit low at 2.5MB/s, suggesting to me that this is CPU-bound.

Does that seem slow? Can anyone offer any ideas by glancing at the query as
to why this might be slow? We'll profile it meanwhile and post back if we
find anything ourselves.

A side issue - I've found that this query, and others, sometimes completes
but doesn't return any results. There appears to be no error that I can see
in the logs, and Spark reports the job as successful, but the connected
JDBC client (SQLWorkbenchJ in this case), just sits there forever waiting.
I did a quick Google and couldn't find anyone else having similar issues.


Many thanks,

James.


Re: Help optimising Spark SQL query

2015-06-22 Thread James Aley
Thanks for the responses, guys!

Sorry, I forgot to mention that I'm using Spark 1.3.0, but I'll test with
1.4.0 and try the codegen suggestion then report back.


On 22 June 2015 at 12:37, Matthew Johnson matt.john...@algomi.com wrote:

 Hi James,



 What version of Spark are you using? In Spark 1.2.2 I had an issue where
 Spark would report a job as complete but I couldn’t find my results
 anywhere – I just assumed it was me doing something wrong as I am still
 quite new to Spark. However, since upgrading to 1.4.0 I have not seen this
 issue, so might be worth upgrading if you are not already on 1.4.



 Cheers,

 Matthew





 *From:* Lior Chaga [mailto:lio...@taboola.com]
 *Sent:* 22 June 2015 17:24
 *To:* James Aley
 *Cc:* user
 *Subject:* Re: Help optimising Spark SQL query



 Hi James,



 There are a few configurations that you can try:


 https://spark.apache.org/docs/latest/sql-programming-guide.html#other-configuration-options



 From my experience, the codegen really boost things up. Just run
 sqlContext.sql(spark.sql.codegen=true) before you execute your query. But
 keep in mind that sometimes this is buggy (depending on your query), so
 compare to results without codegen to be sure.

 Also you can try changing the default partitions.



 You can also use dataframes (since 1.3). Not sure they are better than
 specifying the query in 1.3, but with spark 1.4 there should be an enormous
 performance improvement in dataframes.



 Lior



 On Mon, Jun 22, 2015 at 6:28 PM, James Aley james.a...@swiftkey.com
 wrote:

 Hello,



 A colleague of mine ran the following Spark SQL query:



 select

   count(*) as uses,

   count (distinct cast(id as string)) as users

 from usage_events

 where

   from_unixtime(cast(timestamp_millis/1000 as bigint))

 between '2015-06-09' and '2015-06-16'



 The table contains billions of rows, but totals only 64GB of data across
 ~30 separate files, which are stored as Parquet with LZO compression in S3.



 From the referenced columns:



 * id is Binary, which we cast to a String so that we can DISTINCT by it.
 (I was already told this will improve in a later release, in a separate
 thread.)

 * timestamp_millis is a long, containing a unix timestamp with
 millisecond resolution



 This took nearly 2 hours to run on a 5 node cluster of r3.xlarge EC2
 instances, using 20 executors, each with 4GB memory. I can see from
 monitoring tools that the CPU usage is at 100% on all nodes, but incoming
 network seems a bit low at 2.5MB/s, suggesting to me that this is CPU-bound.



 Does that seem slow? Can anyone offer any ideas by glancing at the query
 as to why this might be slow? We'll profile it meanwhile and post back if
 we find anything ourselves.



 A side issue - I've found that this query, and others, sometimes completes
 but doesn't return any results. There appears to be no error that I can see
 in the logs, and Spark reports the job as successful, but the connected
 JDBC client (SQLWorkbenchJ in this case), just sits there forever waiting.
 I did a quick Google and couldn't find anyone else having similar issues.





 Many thanks,



 James.





Re: Optimisation advice for Avro-Parquet merge job

2015-06-12 Thread James Aley
Hey Kiran,

Thanks very much for the response. I left for vacation before I could try
this out, but I'll experiment once I get back and let you know how it goes.

Thanks!

James.

On 8 June 2015 at 12:34, kiran lonikar loni...@gmail.com wrote:

 It turns out my assumption on load and unionAll being blocking is not
 correct. They are transformations. So instead of just running only the load
 and unionAll in the run() methods, I think you will have to save the
 intermediate dfInput[i] to temp (parquet) files (possibly to in memory DFS
 like http://tachyon-project.org/) in the run() methods. The second for
 loop will also have to load from the intermediate parquet files. Then
 finally save the final dfInput[0] to the HDFS.

 I think this way of parallelizing will force the cluster to utilize the
 all the resources.

 -Kiran

 On Mon, Jun 8, 2015 at 12:30 PM, kiran lonikar loni...@gmail.com wrote:

 James,

 As I can see, there are three distinct parts to your program:

- for loop
- synchronized block
- final outputFrame.save statement

 Can you do a separate timing measurement by putting a simple
 System.currentTimeMillis() around these blocks to know how much they are
 taking and then try to optimize where it takes longest? In the second
 block, you may want to measure the time for the two statements. Improving
 this boils down to playing with spark settings.

 Now consider the first block: I think this is a classic case of merge
 sort or a reduce tree. You already tried to improve this by submitting jobs
 in parallel using executor pool/Callable etc.

 To further improve the parallelization, I suggest you use a reduce tree
 like approach. For example, lets say you want to compute sum of all
 elements of an array in parallel. The way its solved for a GPU like
 platform is you divide your input array initially in chunks of 2, compute
 those n/2 sums parallely on separate threads and save the result in the
 first of the two elements. In the next iteration, you compute n/4 sums
 parallely of the earlier sums and so on till you are left with only two
 elements whose sum gives you final sum.

 You are performing many sequential unionAll operations for inputs.size()
 avro files. Assuming the unionAll() on DataFrame is blocking (and not a
 simple transformation like on RDDs) and actually performs the union
 operation, you will certainly benefit by parallelizing this loop. You may
 change the loop to something like below:

 // pseudo code only
 int n = inputs.size()
 // initialize executor
 executor = new FixedThreadPoolExecutor(n/2)
 dfInput = new DataFrame[n/2]
 for(int i =0;i  n/2;i++) {
 executor.submit(new Runnable() {
 public void run() {
 // union of i and i+n/2
 // showing [] only to bring out array access. Replace with
 dfInput(i) in your code
 dfInput[i] = sqlContext.load(inputPaths.get(i),
 com.databricks.spark.avro).unionAll(sqlContext.load(inputsPath.get(i +
 n/2), com.databricks.spark.avro))
 }
 });
 }

 executor.awaitTermination(0, TimeUnit.SECONDS)

 int steps = log(n)/log(2.0)
 for(s = 2; s  steps;s++) {
 int stride = n/(1  s); // n/(2^s)
 for(int i = 0;i  stride;i++) {
 executor.submit(new Runnable() {
 public void run() {
 // union of i and i+n/2
 // showing [] only to bring out array access. Replace
 with dfInput(i) and dfInput(i+stride) in your code
 dfInput[i] = dfInput[i].unionAll(dfInput[i + stride])
 }
 });
 }
 executor.awaitTermination(0, TimeUnit.SECONDS)
 }

 Let me know if it helped.

 -Kiran


 On Thu, Jun 4, 2015 at 8:57 PM, James Aley james.a...@swiftkey.com
 wrote:

 Thanks for the confirmation! We're quite new to Spark, so a little
 reassurance is a good thing to have sometimes :-)

 The thing that's concerning me at the moment is that my job doesn't seem
 to run any faster with more compute resources added to the cluster, and
 this is proving a little tricky to debug. There are a lot of variables, so
 here's what we've tried already and the apparent impact. If anyone has any
 further suggestions, we'd love to hear!

 * Increase the minimum number of output files (targetPartitions
 above), so that input groups smaller than our minimum chunk size can still
 be worked on by more than one executor. This does measurably speed things
 up, but obviously it's a trade-off, as the original goal for this job is to
 merge our data into fewer, larger files.

 * Submit many jobs in parallel, by running the above code in a Callable,
 on an executor pool. This seems to help, to some extent, but I'm not sure
 what else needs to be configured alongside it -- driver threads, scheduling
 policy, etc. We set scheduling to FAIR when doing this, as that seemed
 like the right approach, but we're not 100% confident. It seemed to help
 quite substantially anyway, so perhaps this just needs further tuning?

 * Increasing executors

Re: spark-submit does not use hive-site.xml

2015-06-10 Thread James Pirz
Thanks for your help !
Switching to HiveContext fixed the issue.

Just one side comment:
In the documentation regarding Hive Tables and HiveContext
https://spark.apache.org/docs/latest/sql-programming-guide.html#hive-tables,
we see:

// sc is an existing JavaSparkContext.HiveContext sqlContext = new
org.apache.spark.sql.hive.HiveContext(sc);


But this is incorrect as the constructor in HiveContext does not accept a
JavaSparkContext, but a SparkContext. (the comment is basically
misleading). The correct code snippet should be:

HiveContext sqlContext = new org.apache.spark.sql.hive.HiveContext(sc.sc());


Thanks again for your help.




On Wed, Jun 10, 2015 at 1:17 AM, Cheng Lian lian.cs@gmail.com wrote:

  Hm, this is a common confusion... Although the variable name is
 `sqlContext` in Spark shell, it's actually a `HiveContext`, which extends
 `SQLContext` and has the ability to communicate with Hive metastore.

 So your program need to instantiate a
 `org.apache.spark.sql.hive.HiveContext` instead.

 Cheng


 On 6/10/15 10:19 AM, James Pirz wrote:

 I am using Spark (standalone) to run queries (from a remote client)
 against data in tables that are already defined/loaded in Hive.

 I have started metastore service in Hive successfully, and by putting
 hive-site.xml, with proper metastore.uri, in $SPARK_HOME/conf directory, I
 tried to share its config with spark.

  When I start spark-shell, it gives me a default sqlContext, and I can
 use that to access my Hive's tables with no problem.

  But once I submit a similar query via Spark application through
 'spark-submit', it does not see the tables and it seems it does not pick
 hive-site.xml which is under conf directory in Spark's home. I tried to use
 '--files' argument with spark-submit to pass hive-site.xml' to the
 workers, but it did not change anything.

  Here is how I try to run the application:

  $SPARK_HOME/bin/spark-submit --class SimpleClient --master
 spark://my-spark-master:7077 --files=$SPARK_HOME/conf/hive-site.xml
  simple-sql-client-1.0.jar

  Here is the simple example code that I try to run (in Java):

  SparkConf conf = new SparkConf().setAppName(Simple SQL Client);

 JavaSparkContext sc = new JavaSparkContext(conf);

 SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);

 DataFrame res = sqlContext.sql(show tables);

 res.show();


  Here are the SW versions:
 Spark: 1.3
 Hive: 1.2
 Hadoop: 2.6

  Thanks in advance for any suggestion.





Re: Running SparkSql against Hive tables

2015-06-09 Thread James Pirz
Thanks Ayan, I used beeline in Spark to connect to Hiveserver2 that I
started from my Hive. So as you said, It is really interacting with Hive as
a typical 3rd party application, and it is NOT using Spark execution
engine. I was thinking that it gets metastore info from Hive, but uses
Spark to execute the query.

I already have created  loaded tables in Hive, and now I want to use Spark
to run SQL queries against those tables. I just want to submit SQL queries
in Spark, and against the data in Hive, wout writing an application (Just
similar to the way that one would pass SQL scripts to Hive or Shark). Going
through the Spark documentation, I realized Spark SQL is the component that
I need to use. But do you mean I have to write a client Spark application
to do that ? Is there any way that one could pass SQL scripts directly
through command-line  Spark runs it in distributed mode on the cluster,
against the already existing data in Hive ?

On Mon, Jun 8, 2015 at 5:53 PM, ayan guha guha.a...@gmail.com wrote:

 I am afraid you are going other way around :) If you want to use Hive in
 spark, you'd need a HiveContext with  hive config files in spark cluster
 (eveery node). This was spark can talk to hive metastore. Then you can
 write queries on hive table using hiveContext's sql method and spark will
 run it (either by reading from hive and creating RDD or lettinghive run the
 query using MR). Final result will be a spark dataFrame.

 What you currently doing is using beeline to connect to hive, which should
 work even without spark.

 Best
 Ayan

 On Tue, Jun 9, 2015 at 10:42 AM, James Pirz james.p...@gmail.com wrote:

 Thanks for the help!
 I am actually trying Spark SQL to run queries against tables that I've
 defined in Hive.

 I follow theses steps:
 - I start hiveserver2 and in Spark, I start Spark's Thrift server by:
 $SPARK_HOME/sbin/start-thriftserver.sh --master
 spark://spark-master-node-ip:7077

 - and I start beeline:
 $SPARK_HOME/bin/beeline

 - In my beeline session, I connect to my running hiveserver2
 !connect jdbc:hive2://hive-node-ip:1

 and I can run queries successfully. But based on hiveserver2 logs, It
 seems it actually uses Hadoop's MR to run queries,  *not* Spark's
 workers. My goals is to access Hive's tables' data, but run queries through
 Spark SQL using Spark workers (not Hadoop).

 Is it possible to do that via Spark SQL (its CLI) or through its thrift
 server ? (I tried to find some basic examples in the documentation, but I
 was not able to) - Any suggestion or hint on how I can do that would be
 highly appreciated.

 Thnx

 On Sun, Jun 7, 2015 at 6:39 AM, Cheng Lian lian.cs@gmail.com wrote:



 On 6/6/15 9:06 AM, James Pirz wrote:

 I am pretty new to Spark, and using Spark 1.3.1, I am trying to use
 'Spark SQL' to run some SQL scripts, on the cluster. I realized that for a
 better performance, it is a good idea to use Parquet files. I have 2
 questions regarding that:

  1) If I wanna use Spark SQL against  *partitioned  bucketed* tables
 with Parquet format in Hive, does the provided spark binary on the apache
 website support that or do I need to build a new spark binary with some
 additional flags ? (I found a note
 https://spark.apache.org/docs/latest/sql-programming-guide.html#hive-tables
  in
 the documentation about enabling Hive support, but I could not fully get it
 as what the correct way of building is, if I need to build)

 Yes, Hive support is enabled by default now for the binaries on the
 website. However, currently Spark SQL doesn't support buckets yet.


  2) Does running Spark SQL against tables in Hive downgrade the
 performance, and it is better that I load parquet files directly to HDFS or
 having Hive in the picture is harmless ?

 If you're using Parquet, then it should be fine since by default Spark
 SQL uses its own native Parquet support to read Parquet Hive tables.


  Thnx






 --
 Best Regards,
 Ayan Guha



Re: Running SparkSql against Hive tables

2015-06-09 Thread James Pirz
I am trying to use Spark 1.3 (Standalone) against Hive 1.2 running on
Hadoop 2.6.
I looked the ThriftServer2 logs, and I realized that the server was not
starting properly, because of failure in creating a server socket. In fact,
I had passed the URI to my Hiveserver2 service, launched from Hive, and the
beeline in Spark was directly talking to Hive's hiveserver2 and it was just
using it as a Hive service.

I could fix starting the Thriftserver2 in Spark (by changing port), but I
guess the missing puzzle piece for me is: How does Spark SQL re-uses the
already created table in Hive ? I mean do I have to write an application
that uses HiveContext to do that and submit it to Spark for execution, or
is there a way to run SQL scripts directly via command line (in distributed
mode and on the cluster) - (Just similar to the way that one would use Hive
(or Shark) command line by passing a query file with -f flag). Looking at
the Spark SQL documentation, it seems that it is possible. Please correct
me if I am wrong.

On Mon, Jun 8, 2015 at 6:56 PM, Cheng Lian lian.cs@gmail.com wrote:


 On 6/9/15 8:42 AM, James Pirz wrote:

 Thanks for the help!
 I am actually trying Spark SQL to run queries against tables that I've
 defined in Hive.

  I follow theses steps:
 - I start hiveserver2 and in Spark, I start Spark's Thrift server by:
 $SPARK_HOME/sbin/start-thriftserver.sh --master
 spark://spark-master-node-ip:7077

  - and I start beeline:
 $SPARK_HOME/bin/beeline

  - In my beeline session, I connect to my running hiveserver2
 !connect jdbc:hive2://hive-node-ip:1

  and I can run queries successfully. But based on hiveserver2 logs, It
 seems it actually uses Hadoop's MR to run queries,  *not* Spark's
 workers. My goals is to access Hive's tables' data, but run queries through
 Spark SQL using Spark workers (not Hadoop).

 Hm, interesting. HiveThriftServer2 should never issue MR jobs to perform
 queries. I did receive two reports in the past which also say MR jobs
 instead of Spark jobs were issued to perform the SQL query. However, I only
 reproduced this issue in a rare corner case, which uses HTTP mode to
 connect to Hive 0.12.0. Apparently this isn't your case. Would you mind to
 provide more details so that I can dig in?  The following information would
 be very helpful:

 1. Hive version
 2. A copy of your hive-site.xml
 3. Hadoop version
 4. Full HiveThriftServer2 log (which can be found in $SPARK_HOME/logs)

 Thanks in advance!


  Is it possible to do that via Spark SQL (its CLI) or through its thrift
 server ? (I tried to find some basic examples in the documentation, but I
 was not able to) - Any suggestion or hint on how I can do that would be
 highly appreciated.

  Thnx

 On Sun, Jun 7, 2015 at 6:39 AM, Cheng Lian lian.cs@gmail.com wrote:



 On 6/6/15 9:06 AM, James Pirz wrote:

 I am pretty new to Spark, and using Spark 1.3.1, I am trying to use
 'Spark SQL' to run some SQL scripts, on the cluster. I realized that for a
 better performance, it is a good idea to use Parquet files. I have 2
 questions regarding that:

  1) If I wanna use Spark SQL against  *partitioned  bucketed* tables
 with Parquet format in Hive, does the provided spark binary on the apache
 website support that or do I need to build a new spark binary with some
 additional flags ? (I found a note
 https://spark.apache.org/docs/latest/sql-programming-guide.html#hive-tables
  in
 the documentation about enabling Hive support, but I could not fully get it
 as what the correct way of building is, if I need to build)

  Yes, Hive support is enabled by default now for the binaries on the
 website. However, currently Spark SQL doesn't support buckets yet.


  2) Does running Spark SQL against tables in Hive downgrade the
 performance, and it is better that I load parquet files directly to HDFS or
 having Hive in the picture is harmless ?

  If you're using Parquet, then it should be fine since by default Spark
 SQL uses its own native Parquet support to read Parquet Hive tables.


  Thnx







spark-submit does not use hive-site.xml

2015-06-09 Thread James Pirz
I am using Spark (standalone) to run queries (from a remote client) against
data in tables that are already defined/loaded in Hive.

I have started metastore service in Hive successfully, and by putting
hive-site.xml, with proper metastore.uri, in $SPARK_HOME/conf directory, I
tried to share its config with spark.

When I start spark-shell, it gives me a default sqlContext, and I can use
that to access my Hive's tables with no problem.

But once I submit a similar query via Spark application through
'spark-submit', it does not see the tables and it seems it does not pick
hive-site.xml which is under conf directory in Spark's home. I tried to use
'--files' argument with spark-submit to pass hive-site.xml' to the
workers, but it did not change anything.

Here is how I try to run the application:

$SPARK_HOME/bin/spark-submit --class SimpleClient --master
spark://my-spark-master:7077 --files=$SPARK_HOME/conf/hive-site.xml
 simple-sql-client-1.0.jar

Here is the simple example code that I try to run (in Java):

SparkConf conf = new SparkConf().setAppName(Simple SQL Client);

JavaSparkContext sc = new JavaSparkContext(conf);

SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);

DataFrame res = sqlContext.sql(show tables);

res.show();


Here are the SW versions:
Spark: 1.3
Hive: 1.2
Hadoop: 2.6

Thanks in advance for any suggestion.


Re: Running SparkSql against Hive tables

2015-06-08 Thread James Pirz
Thanks for the help!
I am actually trying Spark SQL to run queries against tables that I've
defined in Hive.

I follow theses steps:
- I start hiveserver2 and in Spark, I start Spark's Thrift server by:
$SPARK_HOME/sbin/start-thriftserver.sh --master
spark://spark-master-node-ip:7077

- and I start beeline:
$SPARK_HOME/bin/beeline

- In my beeline session, I connect to my running hiveserver2
!connect jdbc:hive2://hive-node-ip:1

and I can run queries successfully. But based on hiveserver2 logs, It seems
it actually uses Hadoop's MR to run queries,  *not* Spark's workers. My
goals is to access Hive's tables' data, but run queries through Spark SQL
using Spark workers (not Hadoop).

Is it possible to do that via Spark SQL (its CLI) or through its thrift
server ? (I tried to find some basic examples in the documentation, but I
was not able to) - Any suggestion or hint on how I can do that would be
highly appreciated.

Thnx

On Sun, Jun 7, 2015 at 6:39 AM, Cheng Lian lian.cs@gmail.com wrote:



 On 6/6/15 9:06 AM, James Pirz wrote:

 I am pretty new to Spark, and using Spark 1.3.1, I am trying to use 'Spark
 SQL' to run some SQL scripts, on the cluster. I realized that for a better
 performance, it is a good idea to use Parquet files. I have 2 questions
 regarding that:

  1) If I wanna use Spark SQL against  *partitioned  bucketed* tables
 with Parquet format in Hive, does the provided spark binary on the apache
 website support that or do I need to build a new spark binary with some
 additional flags ? (I found a note
 https://spark.apache.org/docs/latest/sql-programming-guide.html#hive-tables 
 in
 the documentation about enabling Hive support, but I could not fully get it
 as what the correct way of building is, if I need to build)

 Yes, Hive support is enabled by default now for the binaries on the
 website. However, currently Spark SQL doesn't support buckets yet.


  2) Does running Spark SQL against tables in Hive downgrade the
 performance, and it is better that I load parquet files directly to HDFS or
 having Hive in the picture is harmless ?

 If you're using Parquet, then it should be fine since by default Spark SQL
 uses its own native Parquet support to read Parquet Hive tables.


  Thnx





  1   2   3   >