Re: dataset aggregators with kryo encoder very slow

2017-01-20 Thread Koert Kuipers
found it :) SPARK-1890
thanks cloud-fan

On Sat, Jan 21, 2017 at 1:46 AM, Koert Kuipers  wrote:

> trying to replicate this in spark itself i can for v2.1.0 but not for
> master. i guess it has been fixed
>
> On Fri, Jan 20, 2017 at 4:57 PM, Koert Kuipers  wrote:
>
>> i started printing out when kryo serializes my buffer data structure for
>> my aggregator.
>>
>> i would expect every buffer object to ideally get serialized only once:
>> at the end of the map-side before the shuffle (so after all the values for
>> the given key within the partition have been reduced into it). i realize
>> that in reality due to the order of the elements coming in this can not
>> always be achieved. but what i see instead is that the buffer is getting
>> serialized after every call to reduce a value into it, always. could this
>> be the reason it is so slow?
>>
>> On Thu, Jan 19, 2017 at 4:17 PM, Koert Kuipers  wrote:
>>
>>> we just converted a job from RDD to Dataset. the job does a single
>>> map-red phase using aggregators. we are seeing very bad performance for the
>>> Dataset version, about 10x slower.
>>>
>>> in the Dataset version we use kryo encoders for some of the aggregators.
>>> based on some basic profiling of spark in local mode i believe the bad
>>> performance is due to the kryo encoders. about 70% of time is spend in kryo
>>> related classes.
>>>
>>> since we also use kryo for serialization with the RDD i am surprised how
>>> big the performance difference is.
>>>
>>> has anyone seen the same thing? any suggestions for how to improve this?
>>>
>>>
>>
>


Re: dataset aggregators with kryo encoder very slow

2017-01-20 Thread Koert Kuipers
trying to replicate this in spark itself i can for v2.1.0 but not for
master. i guess it has been fixed

On Fri, Jan 20, 2017 at 4:57 PM, Koert Kuipers  wrote:

> i started printing out when kryo serializes my buffer data structure for
> my aggregator.
>
> i would expect every buffer object to ideally get serialized only once: at
> the end of the map-side before the shuffle (so after all the values for the
> given key within the partition have been reduced into it). i realize that
> in reality due to the order of the elements coming in this can not always
> be achieved. but what i see instead is that the buffer is getting
> serialized after every call to reduce a value into it, always. could this
> be the reason it is so slow?
>
> On Thu, Jan 19, 2017 at 4:17 PM, Koert Kuipers  wrote:
>
>> we just converted a job from RDD to Dataset. the job does a single
>> map-red phase using aggregators. we are seeing very bad performance for the
>> Dataset version, about 10x slower.
>>
>> in the Dataset version we use kryo encoders for some of the aggregators.
>> based on some basic profiling of spark in local mode i believe the bad
>> performance is due to the kryo encoders. about 70% of time is spend in kryo
>> related classes.
>>
>> since we also use kryo for serialization with the RDD i am surprised how
>> big the performance difference is.
>>
>> has anyone seen the same thing? any suggestions for how to improve this?
>>
>>
>


Re: Ingesting data in parallel across workers in Data Frame

2017-01-20 Thread Peyman Mohajerian
The next section in the same document has a solution.

On Fri, Jan 20, 2017 at 9:03 PM, Abhishek Gupta 
wrote:

> I am trying to load data from the database into DataFrame using JDBC
> driver.I want to get data into partitions the following document has the
> nice explanation how to achieve so.
> https://docs.databricks.com/spark/latest/data-sources/sql-databases.html
>
> The
> problem I am facing that I don't have a numeric column which can be used
> for achieving the partition.
>
> Any help would be appreciated.
>
>
> Thank You
>
> --Abhishek
>
>
>
>


Ingesting data in parallel across workers in Data Frame

2017-01-20 Thread Abhishek Gupta
I am trying to load data from the database into DataFrame using JDBC
driver.I want to get data into partitions the following document has the
nice explanation how to achieve so.
https://docs.databricks.com/spark/latest/data-sources/sql-databases.html

The
problem I am facing that I don't have a numeric column which can be used
for achieving the partition.

Any help would be appreciated.


Thank You

--Abhishek


Kryo (with Spark 1.6.3) class registration slows down processing

2017-01-20 Thread N B
Hello,

Here is something I am unable to explain and goes against Kryo's
documentation, numerous suggestions on the web and on this list as well as
pure intuition.

Our Spark application runs in a single JVM (perhaps this is relevant, hence
mentioning it). We have been using Kryo serialization with Spark (setting
the spark.serializer property to
org.apache.spark.serializer.KryoSerializer) without explicitly registering
classes and everything seems to work well enough. Recently, I have been
looking into making some performance improvements and decided to register
classes.

I turned on the "spark.kryo.registrationRequired" property and started to
register all classes as they were reported by the resulting Exceptions.
Eventually I managed to register them all. BTW, there is a fairly large
number of internal Spark and Scala classes that also I had to register but
that's besides the point here.

I was hoping to gain some performance improvement as per the suggestions of
registering classes. However, what I saw was the exact opposite and
surprising. Performance (throughput) actually deteriorated by at least a
factor of 50%. I turned off the registrationRequired property but kept the
explicit registrations in place with the same result. Then I reduced the
number of registrations and performance started to get better again.
Eventually I got rid of all the explicit registrations (back to where I
started basically) and performance improved back to where it was.

I am unable to explain why I am observing this behavior as this is
counter-intuitive. Explicit registration is supposed to write smaller
amount of data (class names as Strings vs just class Ids as integers) and
hence help improve performance. Is the fact that Spark is running in local
mode (single JVM) a factor here? Any insights will be appreciated.

Thanks
NB


Force mesos to provide GPUs to Spark

2017-01-20 Thread Ji Yan
Dear Spark Users,

With the latest version of Spark and Mesos with GPU support, is there a way
to guarantee a Spark job with specified number of GPUs? Currently the Spark
job sets "spark.mesos.gpus.max" to ask for GPU resources, however this is
an upper bound, which means that Spark will accept Mesos offer even if no
GPU is available to offer. Can we make this explicit as to guarantee GPU
resource offers like other resources i.e. CPU/Mem?

Thanks
Ji

-- 
 

The information in this email is confidential and may be legally 
privileged. It is intended solely for the addressee. Access to this email 
by anyone else is unauthorized. If you are not the intended recipient, any 
disclosure, copying, distribution or any action taken or omitted to be 
taken in reliance on it, is prohibited and may be unlawful.


Re: Dataframe caching

2017-01-20 Thread रविशंकर नायर
Thanks, Will look into this.

Best regards,
Ravion
-- Forwarded message --
From: "Muthu Jayakumar" 
Date: Jan 20, 2017 10:56 AM
Subject: Re: Dataframe caching
To: "☼ R Nair (रविशंकर नायर)" 
Cc: "user@spark.apache.org" 

I guess, this may help in your case?

https://spark.apache.org/docs/latest/sql-programming-guide.h
tml#global-temporary-view

Thanks,
Muthu

On Fri, Jan 20, 2017 at 6:27 AM, ☼ R Nair (रविशंकर नायर) <
ravishankar.n...@gmail.com> wrote:

> Dear all,
>
> Here is a requirement I am thinking of implementing in Spark core. Please
> let me know if this is possible, and kindly provide your thoughts.
>
> A user executes a query to fetch 1 million records from , let's say a
> database. We let the user store this as a  dataframe, partitioned across
> the cluster.
>
> Another user , executed the same query from another session. Is there
> anyway that we can let the second user reuse the dataframe created by the
> first user?
>
> Can we have a master dataframe (or RDD) which stores the information about
> the current dataframes loaded and matches against any queries that are
> coming from other users?
>
> In this way, we will have a wonderful system which never allows same query
> to be executed and loaded again into the cluster memory.
>
> Best, Ravion
>


Re: java.io.InvalidClassException: org.apache.spark.executor.TaskMetrics

2017-01-20 Thread kant kodali
nvm figured. I compiled my client jar with 2.0.2 while the spark that is
deployed on my machines were 2.0.1. communication problems between dev team
and ops team :)

On Fri, Jan 20, 2017 at 3:03 PM, kant kodali  wrote:

> Is this because of versioning issue? can't wait for JDK 9 modular system.
> I am not sure if spark plans to leverage it?
>
> On Fri, Jan 20, 2017 at 1:30 PM, kant kodali  wrote:
>
>> I get the following exception. I am using Spark 2.0.1 and Scala 2.11.8.
>>
>> org.apache.spark.SparkException: Job aborted due to stage failure: Task
>> 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage
>> 0.0 (TID 13, 172.31.20.212): java.io.InvalidClassException:
>> org.apache.spark.executor.TaskMetrics; local class incompatible: stream
>> classdesc serialVersionUID = -2231953621568687904, local class
>> serialVersionUID = -6966587383730940799
>>
>
>


Re: java.io.InvalidClassException: org.apache.spark.executor.TaskMetrics

2017-01-20 Thread kant kodali
Is this because of versioning issue? can't wait for JDK 9 modular system. I
am not sure if spark plans to leverage it?

On Fri, Jan 20, 2017 at 1:30 PM, kant kodali  wrote:

> I get the following exception. I am using Spark 2.0.1 and Scala 2.11.8.
>
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 0
> in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage
> 0.0 (TID 13, 172.31.20.212): java.io.InvalidClassException:
> org.apache.spark.executor.TaskMetrics; local class incompatible: stream
> classdesc serialVersionUID = -2231953621568687904, local class
> serialVersionUID = -6966587383730940799
>


Re: dataset aggregators with kryo encoder very slow

2017-01-20 Thread Koert Kuipers
i started printing out when kryo serializes my buffer data structure for my
aggregator.

i would expect every buffer object to ideally get serialized only once: at
the end of the map-side before the shuffle (so after all the values for the
given key within the partition have been reduced into it). i realize that
in reality due to the order of the elements coming in this can not always
be achieved. but what i see instead is that the buffer is getting
serialized after every call to reduce a value into it, always. could this
be the reason it is so slow?

On Thu, Jan 19, 2017 at 4:17 PM, Koert Kuipers  wrote:

> we just converted a job from RDD to Dataset. the job does a single map-red
> phase using aggregators. we are seeing very bad performance for the Dataset
> version, about 10x slower.
>
> in the Dataset version we use kryo encoders for some of the aggregators.
> based on some basic profiling of spark in local mode i believe the bad
> performance is due to the kryo encoders. about 70% of time is spend in kryo
> related classes.
>
> since we also use kryo for serialization with the RDD i am surprised how
> big the performance difference is.
>
> has anyone seen the same thing? any suggestions for how to improve this?
>
>


Re:

2017-01-20 Thread Keith Chapman
Hi Jacek,

I've looked at SparkListener and tried it, I see it getting fired on the
master but I don't see it getting fired on the workers in a cluster.

Regards,
Keith.

http://keith-chapman.com

On Fri, Jan 20, 2017 at 11:09 AM, Jacek Laskowski  wrote:

> Hi,
>
> (redirecting to users as it has nothing to do with Spark project
> development)
>
> Monitor jobs and stages using SparkListener and submit cleanup jobs where
> a condition holds.
>
> Jacek
>
> On 20 Jan 2017 3:57 a.m., "Keith Chapman"  wrote:
>
>> Hi ,
>>
>> Is it possible for an executor (or slave) to know when an actual job
>> ends? I'm running spark on a cluster (with yarn) and my workers create some
>> temporary files that I would like to clean up once the job ends. Is there a
>> way for the worker to detect that a job has finished? I tried doing it in
>> the JobProgressListener but it does not seem to work in a cluster. The
>> event is not triggered in the worker.
>>
>> Regards,
>> Keith.
>>
>> http://keith-chapman.com
>>
>


java.io.InvalidClassException: org.apache.spark.executor.TaskMetrics

2017-01-20 Thread kant kodali
I get the following exception. I am using Spark 2.0.1 and Scala 2.11.8.

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0
in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage
0.0 (TID 13, 172.31.20.212): java.io.InvalidClassException:
org.apache.spark.executor.TaskMetrics; local class incompatible: stream
classdesc serialVersionUID = -2231953621568687904, local class
serialVersionUID = -6966587383730940799


Re: New runtime exception after switch to Spark 2.1.0

2017-01-20 Thread Jacek Laskowski
Thanks for sharing! A very interesting reading indeed.

Pozdrawiam,
Jacek Laskowski

https://medium.com/@jaceklaskowski/
Mastering Apache Spark 2.0 https://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski


On Fri, Jan 20, 2017 at 10:17 PM, Morten Hornbech  wrote:
> Sure :-)
>
> Digging into the TargetInvocationException revealed a “NoSuchFieldError:
> DEFAULT_MAX_PENDING_TASKS", which we guessed was linked to some kind of
> binary incompatibility in the dependencies. Looking into the stack trace
> this could be traced to a dynamic constructor call in netty, and we could
> see that Sparks netty-all dependency had been lifted from 4.0.29 to 4.0.42.
> At the same time we have a dependency on netty-transport 4.0.37 due to our
> use of the Cassandra Java Driver, and those classes are also in netty-all.
> Looking into the versions we identified that these were indeed not binary
> compatible - a field had been removed as the original error said. Since the
> artifacts are different they can co-exist on the classpath, but of course
> not at runtime, and we then get the error when incompatible versions are
> loaded. Interestingly we could not reproduce this on our laptops - only on
> our buildserver - so there must be some random difference in classloader
> behaviour.
>
> Dependency issues is the typical problem we have when taking in a new
> version of Spark and they always take some time to find out. We had a lot of
> issues with Guava when taking in 2.0 and ended up shading it when no other
> option was available. It would be really nice if the version bumps were
> included in the release notes.
>
> Morten
>
>
> Den 20. jan. 2017 kl. 20.21 skrev Jacek Laskowski :
>
> Hi,
>
> I'd be very interested in how you figured it out. Mind sharing?
>
> Jacek
>
> On 18 Jan 2017 9:51 p.m., "mhornbech"  wrote:
>>
>> For anyone revisiting this at a later point, the issue was that Spark
>> 2.1.0
>> upgrades netty to version 4.0.42 which is not binary compatible with
>> version
>> 4.0.37 used by version 3.1.0 of the Cassandra Java Driver. The newer
>> version
>> can work with Cassandra, but because of differences in the maven artifacts
>> (Spark depends on netty-all while Cassandra depends on netty-transport)
>> this
>> was not automatically resolved by SBT. Adding an explicit dependency to
>> netty-transport version 4.0.42 solved the problem.
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/New-runtime-exception-after-switch-to-Spark-2-1-0-tp28263p28319.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



is this something to worry about? HADOOP_HOME or hadoop.home.dir are not set

2017-01-20 Thread kant kodali
Hi,

I am running spark standalone with no storage. when I use spark-submit to
submit my job I get the following Exception and I wonder if this is
something to worry about?

*java.io.IOException: HADOOP_HOME or hadoop.home.dir are not set*


Re: New runtime exception after switch to Spark 2.1.0

2017-01-20 Thread Morten Hornbech
Sure :-)

Digging into the TargetInvocationException revealed a “NoSuchFieldError: 
DEFAULT_MAX_PENDING_TASKS", which we guessed was linked to some kind of binary 
incompatibility in the dependencies. Looking into the stack trace this could be 
traced to a dynamic constructor call in netty, and we could see that Sparks 
netty-all dependency had been lifted from 4.0.29 to 4.0.42. At the same time we 
have a dependency on netty-transport 4.0.37 due to our use of the Cassandra 
Java Driver, and those classes are also in netty-all. Looking into the versions 
we identified that these were indeed not binary compatible - a field had been 
removed as the original error said. Since the artifacts are different they can 
co-exist on the classpath, but of course not at runtime, and we then get the 
error when incompatible versions are loaded. Interestingly we could not 
reproduce this on our laptops - only on our buildserver - so there must be some 
random difference in classloader behaviour.

Dependency issues is the typical problem we have when taking in a new version 
of Spark and they always take some time to find out. We had a lot of issues 
with Guava when taking in 2.0 and ended up shading it when no other option was 
available. It would be really nice if the version bumps were included in the 
release notes.

Morten


> Den 20. jan. 2017 kl. 20.21 skrev Jacek Laskowski :
> 
> Hi, 
> 
> I'd be very interested in how you figured it out. Mind sharing? 
> 
> Jacek 
> 
> On 18 Jan 2017 9:51 p.m., "mhornbech"  > wrote:
> For anyone revisiting this at a later point, the issue was that Spark 2.1.0
> upgrades netty to version 4.0.42 which is not binary compatible with version
> 4.0.37 used by version 3.1.0 of the Cassandra Java Driver. The newer version
> can work with Cassandra, but because of differences in the maven artifacts
> (Spark depends on netty-all while Cassandra depends on netty-transport) this
> was not automatically resolved by SBT. Adding an explicit dependency to
> netty-transport version 4.0.42 solved the problem.
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/New-runtime-exception-after-switch-to-Spark-2-1-0-tp28263p28319.html
>  
> 
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org 
> 
> 



Fwd: Spark streaming app that processes Kafka DStreams produces no output and no error

2017-01-20 Thread Shixiong(Ryan) Zhu
-- Forwarded message --
From: Shixiong(Ryan) Zhu 
Date: Fri, Jan 20, 2017 at 12:06 PM
Subject: Re: Spark streaming app that processes Kafka DStreams produces no
output and no error
To: shyla deshpande 


That's how KafkaConsumer works right now. It will retry forever for network
errors. See https://issues.apache.org/jira/browse/KAFKA-1894

On Thu, Jan 19, 2017 at 8:16 PM, shyla deshpande 
wrote:

> There was a issue connecting to Kafka, once that was fixed the spark app
> works.  Hope this helps someone.
> Thanks
>
> On Mon, Jan 16, 2017 at 7:58 AM, shyla deshpande  > wrote:
>
>> Hello,
>> I checked the log file on the worker node and don't see any error there.
>> This is the first time I am asked to run on such a small cluster.  I feel
>> its the resources issue, but it will be great help is somebody can confirm
>> this or share your experience. Thanks
>>
>> On Sat, Jan 14, 2017 at 4:01 PM, shyla deshpande <
>> deshpandesh...@gmail.com> wrote:
>>
>>> Hello,
>>>
>>> I want to add that,
>>> I don't even see the streaming tab in the application UI on port 4040
>>> when I run it on the cluster.
>>> The cluster on EC2  has 1 master node and 1 worker node.
>>> The cores used on the worker node is 2 of 2 and memory used is 6GB of
>>> 6.3GB.
>>>
>>> Can I run a spark streaming job with just 2 cores?
>>>
>>> Appreciate your time and help.
>>>
>>> Thanks
>>>
>>>
>>>
>>>
>>>
>>> On Fri, Jan 13, 2017 at 10:46 PM, shyla deshpande <
>>> deshpandesh...@gmail.com> wrote:
>>>
 Hello,

 My spark streaming app that reads kafka topics and prints the DStream
 works fine on my laptop, but on AWS cluster it produces no output and no
 errors.

 Please help me debug.

 I am using Spark 2.0.2 and kafka-0-10

 Thanks

 The following is the output of the spark streaming app...


 17/01/14 06:22:41 WARN NativeCodeLoader: Unable to load native-hadoop 
 library for your platform... using builtin-java classes where applicable
 17/01/14 06:22:43 WARN Checkpoint: Checkpoint directory check1 does not 
 exist
 Creating new context
 17/01/14 06:22:45 WARN SparkContext: Use an existing SparkContext, some 
 configuration may not take effect.
 17/01/14 06:22:45 WARN KafkaUtils: overriding enable.auto.commit to false 
 for executor
 17/01/14 06:22:45 WARN KafkaUtils: overriding auto.offset.reset to none 
 for executor
 17/01/14 06:22:45 WARN KafkaUtils: overriding executor group.id to 
 spark-executor-whilDataStream
 17/01/14 06:22:45 WARN KafkaUtils: overriding receive.buffer.bytes to 
 65536 see KAFKA-3135



>>>
>>
>


Re: spark 2.02 error when writing to s3

2017-01-20 Thread Neil Jonkers
Can you test by enabling emrfs consistent view and use s3:// uri.

http://docs.aws.amazon.com/emr/latest/ManagementGuide/enable-consistent-view.html

 Original message From: Steve Loughran 
 Date:20/01/2017  21:17  (GMT+02:00) 
To: "VND Tremblay, Paul"  Cc: 
Takeshi Yamamuro ,user@spark.apache.org 
Subject: Re: spark 2.02 error when writing to s3 
AWS S3 is eventually consistent: even after something is deleted, a 
LIST/GET call may show it. You may be seeing that effect; even after the DELETE 
has got rid of the files, a listing sees something there, And I suspect the 
time it takes for the listing to "go away" will depend on the total number of 
entries underneath, as there are more deletion markers "tombstones" to 
propagate around s3

Try deleting the path and then waiting a short period


On 20 Jan 2017, at 18:54, VND Tremblay, Paul  wrote:

I am using an EMR cluster, and the latest version offered is 2.02. The link 
below indicates that that user had the same problem, which seems unresolved.
 
Thanks
 
Paul
 
_

Paul Tremblay 
Analytics Specialist 
THE BOSTON CONSULTING GROUP
Tel. + ▪ Mobile +

_

From: Takeshi Yamamuro [mailto:linguin@gmail.com] 
Sent: Thursday, January 19, 2017 9:27 PM
To: VND Tremblay, Paul
Cc: user@spark.apache.org
Subject: Re: spark 2.02 error when writing to s3
 
Hi,
 
Do you get the same exception also in v2.1.0?
Anyway, I saw another guy reporting the same error, I think.
https://www.mail-archive.com/user@spark.apache.org/msg60882.html
 
// maropu
 
 
On Fri, Jan 20, 2017 at 5:15 AM, VND Tremblay, Paul  
wrote:
I have come across a problem when writing CSV files to S3 in Spark 2.02. The 
problem does not exist in Spark 1.6.
 
19:09:20 Caused by: java.io.IOException: File already 
exists:s3://stx-apollo-pr-datascience-internal/revenue_model/part-r-00025-c48a0d52-9600-4495-913c-64ae6bf888bd.csv
 
 
My code is this:
 
new_rdd\
135 .map(add_date_diff)\
136 .map(sid_offer_days)\
137 .groupByKey()\
138 .map(custom_sort)\
139 .map(before_rev_date)\
140 .map(lambda x, num_weeks = args.num_weeks: create_columns(x, 
num_weeks))\
141 .toDF()\
142 .write.csv(
143 sep = "|",
144 header = True,
145 nullValue = '',
146 quote = None,
147 path = path
148 )
 
In order to get the path (the last argument), I call this function:
 
150 def _get_s3_write(test):
151 if s3_utility.s3_data_already_exists(_get_write_bucket_name(), 
_get_s3_write_dir(test)):
152 s3_utility.remove_s3_dir(_get_write_bucket_name(), 
_get_s3_write_dir(test))
153 return make_s3_path(_get_write_bucket_name(), _get_s3_write_dir(test))
 
In other words, I am removing the directory if it exists before I write. 
 
Notes:
 
* If I use a small set of data, then I don't get the error
 
* If I use Spark 1.6, I don't get the error
 
* If I read in a simple dataframe and then write to S3, I still get the error 
(without doing any transformations)
 
* If I do the previous step with a smaller set of data, I don't get the error.
 
* I am using pyspark, with python 2.7
 
* The thread at this link: 
https://forums.aws.amazon.com/thread.jspa?threadID=152470  Indicates the 
problem is caused by a problem sync problem. With large datasets, spark tries 
to write multiple times and causes the error. The suggestion is to turn off 
speculation, but I believe speculation is turned off by default in pyspark.
 
Thanks!
 
Paul
 
 
_

Paul Tremblay 
Analytics Specialist 

THE BOSTON CONSULTING GROUP
STL ▪ 

Tel. + ▪ Mobile +
tremblay.p...@bcg.com
_

Read BCG's latest insights, analysis, and viewpoints at bcgperspectives.com
 

The Boston Consulting Group, Inc. 

This e-mail message may contain confidential and/or privileged information. If 
you are not an addressee or otherwise authorized to receive this message, you 
should not use, copy, disclose or  take any action based on this e-mail or any 
information contained in the message. If you have received this material in 
error, please advise the sender immediately by reply e-mail and delete this 
message. Thank you.


 
-- 
---
Takeshi Yamamuro



Re: New runtime exception after switch to Spark 2.1.0

2017-01-20 Thread Jacek Laskowski
Hi,

I'd be very interested in how you figured it out. Mind sharing?

Jacek

On 18 Jan 2017 9:51 p.m., "mhornbech"  wrote:

> For anyone revisiting this at a later point, the issue was that Spark 2.1.0
> upgrades netty to version 4.0.42 which is not binary compatible with
> version
> 4.0.37 used by version 3.1.0 of the Cassandra Java Driver. The newer
> version
> can work with Cassandra, but because of differences in the maven artifacts
> (Spark depends on netty-all while Cassandra depends on netty-transport)
> this
> was not automatically resolved by SBT. Adding an explicit dependency to
> netty-transport version 4.0.42 solved the problem.
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/New-runtime-exception-after-switch-to-Spark-2-1-0-
> tp28263p28319.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: spark 2.02 error when writing to s3

2017-01-20 Thread Steve Loughran
AWS S3 is eventually consistent: even after something is deleted, a LIST/GET 
call may show it. You may be seeing that effect; even after the DELETE has got 
rid of the files, a listing sees something there, And I suspect the time it 
takes for the listing to "go away" will depend on the total number of entries 
underneath, as there are more deletion markers "tombstones" to propagate around 
s3

Try deleting the path and then waiting a short period


On 20 Jan 2017, at 18:54, VND Tremblay, Paul 
mailto:tremblay.p...@bcg.com>> wrote:

I am using an EMR cluster, and the latest version offered is 2.02. The link 
below indicates that that user had the same problem, which seems unresolved.

Thanks

Paul

_

Paul Tremblay
Analytics Specialist
THE BOSTON CONSULTING GROUP
Tel. + ▪ Mobile +

_

From: Takeshi Yamamuro [mailto:linguin@gmail.com]
Sent: Thursday, January 19, 2017 9:27 PM
To: VND Tremblay, Paul
Cc: user@spark.apache.org
Subject: Re: spark 2.02 error when writing to s3

Hi,

Do you get the same exception also in v2.1.0?
Anyway, I saw another guy reporting the same error, I think.
https://www.mail-archive.com/user@spark.apache.org/msg60882.html

// maropu


On Fri, Jan 20, 2017 at 5:15 AM, VND Tremblay, Paul 
mailto:tremblay.p...@bcg.com>> wrote:
I have come across a problem when writing CSV files to S3 in Spark 2.02. The 
problem does not exist in Spark 1.6.


19:09:20 Caused by: java.io.IOException: File already 
exists:s3://stx-apollo-pr-datascience-internal/revenue_model/part-r-00025-c48a0d52-9600-4495-913c-64ae6bf888bd.csv



My code is this:

new_rdd\
135 .map(add_date_diff)\
136 .map(sid_offer_days)\
137 .groupByKey()\
138 .map(custom_sort)\
139 .map(before_rev_date)\
140 .map(lambda x, num_weeks = args.num_weeks: create_columns(x, 
num_weeks))\
141 .toDF()\
142 .write.csv(
143 sep = "|",
144 header = True,
145 nullValue = '',
146 quote = None,
147 path = path
148 )

In order to get the path (the last argument), I call this function:

150 def _get_s3_write(test):
151 if s3_utility.s3_data_already_exists(_get_write_bucket_name(), 
_get_s3_write_dir(test)):
152 s3_utility.remove_s3_dir(_get_write_bucket_name(), 
_get_s3_write_dir(test))
153 return make_s3_path(_get_write_bucket_name(), _get_s3_write_dir(test))

In other words, I am removing the directory if it exists before I write.

Notes:

* If I use a small set of data, then I don't get the error

* If I use Spark 1.6, I don't get the error

* If I read in a simple dataframe and then write to S3, I still get the error 
(without doing any transformations)

* If I do the previous step with a smaller set of data, I don't get the error.

* I am using pyspark, with python 2.7

* The thread at this link: 
https://forums.aws.amazon.com/thread.jspa?threadID=152470  Indicates the 
problem is caused by a problem sync problem. With large datasets, spark tries 
to write multiple times and causes the error. The suggestion is to turn off 
speculation, but I believe speculation is turned off by default in pyspark.

Thanks!

Paul


_

Paul Tremblay
Analytics Specialist

THE BOSTON CONSULTING GROUP
STL ▪

Tel. + ▪ Mobile +
tremblay.p...@bcg.com
_

Read BCG's latest insights, analysis, and viewpoints at 
bcgperspectives.com



The Boston Consulting Group, Inc.

This e-mail message may contain confidential and/or privileged information. If 
you are not an addressee or otherwise authorized to receive this message, you 
should not use, copy, disclose or take any action based on this e-mail or any 
information contained in the message. If you have received this material in 
error, please advise the sender immediately by reply e-mail and delete this 
message. Thank you.



--
---
Takeshi Yamamuro



Re:

2017-01-20 Thread Jacek Laskowski
Hi,

(redirecting to users as it has nothing to do with Spark project
development)

Monitor jobs and stages using SparkListener and submit cleanup jobs where a
condition holds.

Jacek

On 20 Jan 2017 3:57 a.m., "Keith Chapman"  wrote:

> Hi ,
>
> Is it possible for an executor (or slave) to know when an actual job ends?
> I'm running spark on a cluster (with yarn) and my workers create some
> temporary files that I would like to clean up once the job ends. Is there a
> way for the worker to detect that a job has finished? I tried doing it in
> the JobProgressListener but it does not seem to work in a cluster. The
> event is not triggered in the worker.
>
> Regards,
> Keith.
>
> http://keith-chapman.com
>


RE: spark 2.02 error when writing to s3

2017-01-20 Thread VND Tremblay, Paul
I am using an EMR cluster, and the latest version offered is 2.02. The link 
below indicates that that user had the same problem, which seems unresolved.

Thanks

Paul

_

Paul Tremblay
Analytics Specialist
THE BOSTON CONSULTING GROUP
Tel. + ▪ Mobile +

_

From: Takeshi Yamamuro [mailto:linguin@gmail.com]
Sent: Thursday, January 19, 2017 9:27 PM
To: VND Tremblay, Paul
Cc: user@spark.apache.org
Subject: Re: spark 2.02 error when writing to s3

Hi,

Do you get the same exception also in v2.1.0?
Anyway, I saw another guy reporting the same error, I think.
https://www.mail-archive.com/user@spark.apache.org/msg60882.html

// maropu


On Fri, Jan 20, 2017 at 5:15 AM, VND Tremblay, Paul 
mailto:tremblay.p...@bcg.com>> wrote:
I have come across a problem when writing CSV files to S3 in Spark 2.02. The 
problem does not exist in Spark 1.6.


19:09:20 Caused by: java.io.IOException: File already 
exists:s3://stx-apollo-pr-datascience-internal/revenue_model/part-r-00025-c48a0d52-9600-4495-913c-64ae6bf888bd.csv


My code is this:

new_rdd\
135 .map(add_date_diff)\
136 .map(sid_offer_days)\
137 .groupByKey()\
138 .map(custom_sort)\
139 .map(before_rev_date)\
140 .map(lambda x, num_weeks = args.num_weeks: create_columns(x, 
num_weeks))\
141 .toDF()\
142 .write.csv(
143 sep = "|",
144 header = True,
145 nullValue = '',
146 quote = None,
147 path = path
148 )

In order to get the path (the last argument), I call this function:

150 def _get_s3_write(test):
151 if s3_utility.s3_data_already_exists(_get_write_bucket_name(), 
_get_s3_write_dir(test)):
152 s3_utility.remove_s3_dir(_get_write_bucket_name(), 
_get_s3_write_dir(test))
153 return make_s3_path(_get_write_bucket_name(), _get_s3_write_dir(test))

In other words, I am removing the directory if it exists before I write.

Notes:

* If I use a small set of data, then I don't get the error

* If I use Spark 1.6, I don't get the error

* If I read in a simple dataframe and then write to S3, I still get the error 
(without doing any transformations)

* If I do the previous step with a smaller set of data, I don't get the error.

* I am using pyspark, with python 2.7

* The thread at this link: 
https://forums.aws.amazon.com/thread.jspa?threadID=152470  Indicates the 
problem is caused by a problem sync problem. With large datasets, spark tries 
to write multiple times and causes the error. The suggestion is to turn off 
speculation, but I believe speculation is turned off by default in pyspark.

Thanks!

Paul


_

Paul Tremblay
Analytics Specialist

THE BOSTON CONSULTING GROUP
STL ▪

Tel. + ▪ Mobile +
tremblay.p...@bcg.com
_

Read BCG's latest insights, analysis, and viewpoints at 
bcgperspectives.com



The Boston Consulting Group, Inc.

This e-mail message may contain confidential and/or privileged information. If 
you are not an addressee or otherwise authorized to receive this message, you 
should not use, copy, disclose or take any action based on this e-mail or any 
information contained in the message. If you have received this material in 
error, please advise the sender immediately by reply e-mail and delete this 
message. Thank you.



--
---
Takeshi Yamamuro


FunctionRegistry

2017-01-20 Thread Bowden, Chris
Thoughts on exposing FunctionRegistry via ExperimentalMethods?


I have functionality which can not be expressed efficiently via UDFs, 
consequently I implement my own Expressions. Currently I have to lift access to 
FunctionRegistry in my project(s) within org.apache.spark.sql.*. I also have to 
duplicate a number of functions to reconstruct FunctionRegistry.expression's 
behavior.


If we allow public modification of strategies and optimizations is there much 
risk in allowing direct access to register Expressions that can't be succinctly 
or efficiently represented as high level UDFs?


too noisy

2017-01-20 Thread Alvin Chen
hi, this mailing list is too noisy. is there another one i can sign up for
that only includes releases and announcements?

thanks,
Alvin


Re: Differing triplet and vertex data

2017-01-20 Thread lbollar
Sorry, wrong link above.

http://apache-spark-user-list.1001560.n3.nabble.com/Differing-triplet-and-vertex-data-td28330.html



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Differing-triplet-and-vertex-data-tp28330p28331.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Differing triplet and vertex data

2017-01-20 Thread lbollar
Hello all,

Found this previous post but didn't see answer that appears related.

http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=search_page&node=1&query=triplet+vertex


I have a graph algorithm in spark 1.6.2 where I am implementing Louvain
Modularity. The implementation has the same premise that
PageRank.runWithOptions does, in that it has a graph as a variable,
iterates, and overwrites this var from time to time and I have followed some
of their techniques there for materializing the graph and unpersisting. 

One peculiar thing though is that after having iterated and constructing the
new graph, if I traverse the triplets and print each one, my vertices match
the original unaltered graph the algo started with. If I map over the
vertices only, they have been changed to their new updated values. Logging
inserted into the method shows the same behavior. 

In order to actually have the new triplets materialize, I have to overwrite
the graph with a call to a new constructor using graph =
Graph(graph.vertices, graph.edges) which seems a bit hacky. 

Is this normal or expected behavior where triplet vertex data would differ
from vertex data in some circumstances?

Thanks,



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Differing-triplet-and-vertex-data-tp28330.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Dataframe caching

2017-01-20 Thread Muthu Jayakumar
I guess, this may help in your case?

https://spark.apache.org/docs/latest/sql-programming-guide.html#global-temporary-view

Thanks,
Muthu

On Fri, Jan 20, 2017 at 6:27 AM, ☼ R Nair (रविशंकर नायर) <
ravishankar.n...@gmail.com> wrote:

> Dear all,
>
> Here is a requirement I am thinking of implementing in Spark core. Please
> let me know if this is possible, and kindly provide your thoughts.
>
> A user executes a query to fetch 1 million records from , let's say a
> database. We let the user store this as a  dataframe, partitioned across
> the cluster.
>
> Another user , executed the same query from another session. Is there
> anyway that we can let the second user reuse the dataframe created by the
> first user?
>
> Can we have a master dataframe (or RDD) which stores the information about
> the current dataframes loaded and matches against any queries that are
> coming from other users?
>
> In this way, we will have a wonderful system which never allows same query
> to be executed and loaded again into the cluster memory.
>
> Best, Ravion
>


Dataframe caching

2017-01-20 Thread रविशंकर नायर
Dear all,

Here is a requirement I am thinking of implementing in Spark core. Please
let me know if this is possible, and kindly provide your thoughts.

A user executes a query to fetch 1 million records from , let's say a
database. We let the user store this as a  dataframe, partitioned across
the cluster.

Another user , executed the same query from another session. Is there
anyway that we can let the second user reuse the dataframe created by the
first user?

Can we have a master dataframe (or RDD) which stores the information about
the current dataframes loaded and matches against any queries that are
coming from other users?

In this way, we will have a wonderful system which never allows same query
to be executed and loaded again into the cluster memory.

Best, Ravion


Running Hive Beeline .hql file in Spark

2017-01-20 Thread Ravi Prasad
Hi ,
  Currently we are running Hive Beeline queries  as below.
*Beeline :-*
beeline -u "jdbc:hive2://localhost:1/default;principal=hive/_
h...@nsroot.net"  --showHeader=false --silent=true --outputformat=dsv
--verbose =false  -f /home/*sample.hql *> output_partition.txt

Note :  We run the Hive queries in *sample.hql *and redirect the output in
output file  output_partition.txt

*Spark:*
Can anyone tell us how to implement this in *Spark sql*  ( ie) Executing
 the hive.hql file and redirecting the output in one file.

-- 
--
Regards,
Prasad T


Re: Saving from Dataset to Bigquery Table

2017-01-20 Thread Jörn Franke
Sure but pairrdd is part of the Spark libraries and should work in the cloud.

> On 20 Jan 2017, at 13:24, A Shaikh  wrote:
> 
> Thanks for responding Jorn. Currently I upload the jar to Google Cloud and 
> run my job not ideal for development. Do you know if we can run this from 
> within our local machine? given that all the required jars are downloaded by 
> SBT anyways.
> 
>> On 20 January 2017 at 11:22, Jörn Franke  wrote:
>> It is only on pairdd
>> 
>>> On 20 Jan 2017, at 11:54, A Shaikh  wrote:
>>> 
>>> Has anyone experience saving Dataset to Bigquery Table?
>>> 
>>> I am loading into BigQuery using the following example sucessfully. This 
>>> uses RDD.saveAsNewAPIHadoopDataset method to save data. 
>>> I am using Dataset(or DataFrame) and looking for saveAsNewAPIHadoopDataset 
>>> method but unable to find it.
>>> 
> 


Re: Saving from Dataset to Bigquery Table

2017-01-20 Thread A Shaikh
Thanks for responding Jorn. Currently I upload the jar to Google Cloud and
run my job not ideal for development. Do you know if we can run this from
within our local machine? given that all the required jars are downloaded
by SBT anyways.

On 20 January 2017 at 11:22, Jörn Franke  wrote:

> It is only on pairdd
>
> On 20 Jan 2017, at 11:54, A Shaikh  wrote:
>
> Has anyone experience saving Dataset to Bigquery Table?
>
> I am loading into BigQuery using the following example
> sucessfully.
> This uses RDD.saveAsNewAPIHadoopDataset method to save data.
> I am using Dataset(or DataFrame) and looking for saveAsNewAPIHadoopDataset
> method but unable to find it.
>
>


help,I want to call spark-submit from java shell

2017-01-20 Thread lk_spark
hi,all:
   undering spark2.0 with hadoop 2.7.2
   my code like this:
   
   String c1 = "/bin/sh";
   String c2 = "-c";
   StringBuilder sb = new StringBuilder("cd 
/home/hadoop/dmp/spark-2.0.2-bin-hadoop2.7/bin;spark-submit --class 
com.hua.spark.dataload.DataLoadFromBase64JSON --master yarn --deploy-mode 
client  /home/hadoop/dmp/dataload-1.0-SNAPSHOT-jar-with-dependencies.jar ");
   Process pro = Runtime.getRuntime().exec(new String[]{c1,c2,sb.toString()});
   pro.waitFor();

  on the same node, I can exec the commond sucess from terminal , but in java I 
got error :  


17/01/20 06:39:05 ERROR TransportChannelHandler: Connection to 
/192.168.0.136:51197 has been quiet for 12 ms while there are outstanding 
requests. Assuming connection is dead; please adjust spark.network.timeout if 
this is wrong.
17/01/20 06:39:05 ERROR TransportResponseHandler: Still have 1 requests 
outstanding when connection from /192.168.0.136:51197 is closed
17/01/20 06:39:05 WARN NettyRpcEnv: Ignored failure: java.io.IOException: 
Connection from /192.168.0.136:51197 closed
17/01/20 06:39:05 ERROR CoarseGrainedExecutorBackend: Cannot register with 
driver: spark://CoarseGrainedScheduler@192.168.0.136:51197
org.apache.spark.rpc.RpcTimeoutException: Cannot receive any reply in 120 
seconds. This timeout is controlled by spark.rpc.askTimeout
 at 
org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:48)
 at 
org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:63)
 at 
org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)
 at 
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
 at scala.util.Failure$$anonfun$recover$1.apply(Try.scala:216)
 at scala.util.Try$.apply(Try.scala:192)
 at scala.util.Failure.recover(Try.scala:216)
 at scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:326)
 at scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:326)
 at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
 at 
org.spark_project.guava.util.concurrent.MoreExecutors$SameThreadExecutorService.execute(MoreExecutors.java:293)
 at 
scala.concurrent.impl.ExecutionContextImpl$$anon$1.execute(ExecutionContextImpl.scala:136)
 at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
 at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
 at scala.concurrent.Promise$class.complete(Promise.scala:55)
 at scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:153)
 at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:237)
 at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:237)
 at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
 at 
scala.concurrent.BatchingExecutor$Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:63)
 at 
scala.concurrent.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:78)
 at 
scala.concurrent.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:55)
 at 
scala.concurrent.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:55)
 at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
 at scala.concurrent.BatchingExecutor$Batch.run(BatchingExecutor.scala:54)
 at 
scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)
 at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:106)
 at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)
 at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
 at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
 at scala.concurrent.Promise$class.tryFailure(Promise.scala:112)
 at scala.concurrent.impl.Promise$DefaultPromise.tryFailure(Promise.scala:153)
 at 
org.apache.spark.rpc.netty.NettyRpcEnv.org$apache$spark$rpc$netty$NettyRpcEnv$$onFailure$1(NettyRpcEnv.scala:205)
 at org.apache.spark.rpc.netty.NettyRpcEnv$$anon$1.run(NettyRpcEnv.scala:239)
 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
 at java.util.concurrent.FutureTask.run(FutureTask.java:266)
 at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
 at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
 at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
 at java.lang.Thread.run(Thread.java:745)
Caused by: java.util.concurrent.TimeoutException: Cannot receive any reply in 
120 seconds
 ... 8 more
17/01/20 06:39:05 ERROR CoarseGrainedExecutorBackend: Driver 
192.168.0.136:51197 disassociated! Shutting down.

   

2017-01-20


lk_spark 

Re: "Unable to load native-hadoop library for your platform" while running Spark jobs

2017-01-20 Thread Steve Loughran

On 19 Jan 2017, at 10:59, Sean Owen 
mailto:so...@cloudera.com>> wrote:

It's a message from Hadoop libs, not Spark. It can be safely ignored. It's just 
saying you haven't installed the additional (non-Apache-licensed) native libs 
that can accelerate some operations. This is something you can easily have read 
more about online.


I'd worry about in production as it means that higher performance code for 
compression and HDFS encryption at rest (if used) aren' there, and it will fall 
back to the java libraries. Which aren't as bad as you'd think, at least for 
decompression, but they do tend underperform

for dev/test/local work, you can turn off the warning entirely

log4j.logger.org.apache.hadoop.util.NativeCodeLoader=ERROR



On Thu, Jan 19, 2017 at 10:57 AM Md. Rezaul Karim 
mailto:rezaul.ka...@insight-centre.org>> wrote:
Hi All,

I'm the getting the following WARNING while running Spark jobs  in standalone 
mode:
Unable to load native-hadoop library for your platform... using builtin-java 
classes where applicable

Please note that I have configured the native path and the other ENV variables 
as follows:
export JAVA_HOME=/usr/lib/jvm/java-8-oracle
export HADOOP_HOME=/usr/local/hadoop
export HADOOP_COMMON_LIB_NATIVE_DIR=/usr/local/hadoop/lib/native
export LD_LIBRARY_PATH=/usr/local/hadoop/lib/native/:$LD_LIBRARY_PATH
export JAVA_LIBRARY_PATH=/usr/local/hadoop/lib/native
export HADOOP_OPTS="-Djava.library.path=$HADOOP_HOME/lib/native"


Although my Spark job executes successfully and writes the results to a file at 
the end. However, I am not getting any logs to track the progress.

Could someone help me to solve this problem?




Regards,
_
Md. Rezaul Karim, BSc, MSc
PhD Researcher, INSIGHT Centre for Data Analytics
National University of Ireland, Galway
IDA Business Park, Dangan, Galway, Ireland
Web: http://www.reza-analytics.eu/index.html



Re: Anyone has any experience using spark in the banking industry?

2017-01-20 Thread Steve Loughran

> On 18 Jan 2017, at 21:50, kant kodali  wrote:
> 
> Anyone has any experience using spark in the banking industry? I have couple 
> of questions.

> 2. How can I make spark cluster highly available across multi datacenter? Any 
> pointers?


That's not, AFAIK, been a design goal. The communications and scheduling for 
spark assume that (a) there's low latency between executors and driver, and (b) 
that data is close enough to any executor that you don't have to get placement 
right: if you can't schedule work near the data, then running it on other nodes 
is better than not running work. oh, and failure modes are that of a single 
cluster: node and rack failures, not a single long-haul connection which could 
cut the entire cluster in half. If that happens, then all work running on the 
cluster without the driver is lost: you don't have the failure resliience you 
wanted, as it means that if the cluster with the driver actually failed, then 
the other cluster would not be able to take over.

Similarly, cluster filesystems tend to assume they are single DC, with its 
failure modes. life is more complex across two sites. I do know HDFS doesn't 
handle it, though there are things that do.

I would try to come up with a strategy for having separate applications running 
on the different DCs, with a story for data replication and reconciliation.

Even there, though, there'll inevitably be an SPOF. How do you find it? You 
Wait: it will find you.

-steve

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Saving from Dataset to Bigquery Table

2017-01-20 Thread Jörn Franke
It is only on pairdd

> On 20 Jan 2017, at 11:54, A Shaikh  wrote:
> 
> Has anyone experience saving Dataset to Bigquery Table?
> 
> I am loading into BigQuery using the following example sucessfully. This uses 
> RDD.saveAsNewAPIHadoopDataset method to save data. 
> I am using Dataset(or DataFrame) and looking for saveAsNewAPIHadoopDataset 
> method but unable to find it.
> 


Re: Non-linear (curved?) regression line

2017-01-20 Thread Sean Owen
I don't think this is a Spark question. This isn't a problem you solve by
throwing all combinations of options at it. Your target is not a linear
function of input, or its square, and it's not a question of GLM link
function. You may need to look at the log-log plot because this looks like
a power-law distribution. I think you want to learn more about regression
and what it does first.

On Fri, Jan 20, 2017 at 2:10 AM Ganesh  wrote:

>
> Has anyone worked on non-linear/curved regression lines with Apache Spark?
> This seems to be such a trivial issue but I have given up after
> experimenting for nearly two weeks.
> The plot line is as below and the raw data in the table at the end.
>  I just can't get Spark ML to give decent predictions with
> LinearRegression or any family in  GeneralizedLinearRegression.
>
> I need to predict 'sales per day' given SalesRank. As the chart shows its
> some kind of exponential function: lower the rank ,exponentially higher the
> sales.
>
> Things I have tried:
> Polynomial by taking square of features
> Changing family for GLR
> Changing regression parameters
> Sacrificing a goat to the Apache gods.
>
> How do I go about solving this? Do I have to resort to neural networks?
>
>
>
>
> Features Label
> 1 4358
> 5 4283
> 10 4193
> 15 4104
> 20 4017
> 50 3532
> 100 2851
> 150 2302
> 200 1858
> 250 1499
> 500 989
> 1000 553
> 2000 367
> 3500 221
> 5000 139
> 6000 126
> 7500 108
> 9000 92
> 1 83
> 5 12
> 75000 5
>
>
>


Saving from Dataset to Bigquery Table

2017-01-20 Thread A Shaikh
Has anyone experience saving Dataset to Bigquery Table?

I am loading into BigQuery using the following example
sucessfully.
This uses RDD.saveAsNewAPIHadoopDataset method to save data.
I am using Dataset(or DataFrame) and looking for saveAsNewAPIHadoopDataset
method but unable to find it.


Re: physical memory usage keep increasing for spark app on Yarn

2017-01-20 Thread Pavel Plotnikov
Hi Yang,
i have faced with the same problem on Mesos and to circumvent this issue i
am usually increase partition number. On last step in your code you reduce
number of partitions to 1, try to set bigger value, may be it solve this
problem.

Cheers,
Pavel

On Fri, Jan 20, 2017 at 12:35 PM Yang Cao  wrote:

> Hi all,
>
> I am running a spark application on YARN-client mode with 6 executors
> (each 4 cores and executor memory = 6G and Overhead = 4G, spark version:
> 1.6.3 / 2.1.0). I find that my executor memory keeps increasing until get
> killed by node manager; and give out the info that tells me to boost 
> spark.yarn.excutor.memoryOverhead.
> I know that this param mainly control the size of memory allocated
> off-heap. But I don’t know when and how the spark engine will use this part
> of memory. Also increase that part of memory not always solve my
> problem. sometimes works sometimes not. It trends to be useless when the
> input data is large.
>
> FYI, my app’s logic is quite simple. It means to combine the small files
> generated in one single day (one directory one day) into a single one and
> write back to hdfs. Here is the core code:
>
> val df = spark.read.parquet(originpath).filter(s"m = ${ts.month} AND d = 
> ${ts.day}").coalesce(400)
>
> val dropDF = 
> df.drop("hh").drop("mm").drop("mode").drop("y").drop("m").drop("d")
>
> dropDF.repartition(1).write.mode(SaveMode.ErrorIfExists).parquet(targetpath)
>
> The source file may have hundreds to thousands level’s partition. And the
> total parquet file is around 1to 5 gigs. Also I find that in the step that
> shuffle reading data from different machines, The size of shuffle read is
> about 4 times larger than the input size, Which is wired or some principle
> I don’t know.
>
> Anyway, I have done some search myself for this problem. Some article said
> that it’s on the direct buffer memory (I don’t set myself). Some article
> said that people solve it with more frequent full GC. Also I find one
> people on SO with very similar situation:
> http://stackoverflow.com/questions/31646679/ever-increasing-physical-memory-for-a-spark-application-in-yarn
> This guy claimed that it’s a bug with parquet but comment questioned him.
> People in this mail list may also receive an email hours ago from
> blondowski who described this problem while writing json:
> http://apache-spark-user-list.1001560.n3.nabble.com/Executors-running-out-of-memory-tt28325.html#none
>
> So it looks like to be common question for different output format. I hope
> someone with experience about this problem could make an explanation about
> this issue. Why this happen and what is a reliable way to solve this
> problem.
>
> Best,
>
>
>


physical memory usage keep increasing for spark app on Yarn

2017-01-20 Thread Yang Cao
Hi all,

I am running a spark application on YARN-client mode with 6 executors (each 4 
cores and executor memory = 6G and Overhead = 4G, spark version: 1.6.3 / 
2.1.0). I find that my executor memory keeps increasing until get killed by 
node manager; and give out the info that tells me to boost 
spark.yarn.excutor.memoryOverhead. I know that this param mainly control the 
size of memory allocated off-heap. But I don’t know when and how the spark 
engine will use this part of memory. Also increase that part of memory not 
always solve my problem. sometimes works sometimes not. It trends to be useless 
when the input data is large.

FYI, my app’s logic is quite simple. It means to combine the small files 
generated in one single day (one directory one day) into a single one and write 
back to hdfs. Here is the core code:
val df = spark.read.parquet(originpath).filter(s"m = ${ts.month} AND d = 
${ts.day}").coalesce(400)
val dropDF = df.drop("hh").drop("mm").drop("mode").drop("y").drop("m").drop("d")
dropDF.repartition(1).write.mode(SaveMode.ErrorIfExists).parquet(targetpath)
The source file may have hundreds to thousands level’s partition. And the total 
parquet file is around 1to 5 gigs. Also I find that in the step that shuffle 
reading data from different machines, The size of shuffle read is about 4 times 
larger than the input size, Which is wired or some principle I don’t know. 

Anyway, I have done some search myself for this problem. Some article said that 
it’s on the direct buffer memory (I don’t set myself). Some article said that 
people solve it with more frequent full GC. Also I find one people on SO with 
very similar situation: 
http://stackoverflow.com/questions/31646679/ever-increasing-physical-memory-for-a-spark-application-in-yarn
This guy claimed that it’s a bug with parquet but comment questioned him. 
People in this mail list may also receive an email hours ago from blondowski 
who described this problem while writing json: 
http://apache-spark-user-list.1001560.n3.nabble.com/Executors-running-out-of-memory-tt28325.html#none

So it looks like to be common question for different output format. I hope 
someone with experience about this problem could make an explanation about this 
issue. Why this happen and what is a reliable way to solve this problem. 

Best,




Re: TDD in Spark

2017-01-20 Thread A Shaikh
Thanks for all the suggestion. Very Helpful.

On 17 January 2017 at 22:04, Lars Albertsson  wrote:

> My advice, short version:
> * Start by testing one job per test.
> * Use Scalatest or a standard framework.
> * Generate input datasets with Spark routines, write to local file.
> * Run job with local master.
> * Read output with Spark routines, validate only the fields you care
> about for the test case at hand.
> * Focus on building a functional regression test suite with small test
> cases before testing with large input datasets. The former improves
> productivity more.
>
> Avoid:
> * Test frameworks coupled to your processing technology - they will
> make it difficult to switch.
> * Spending much effort to small unit tests. Internal interfaces in
> Spark tend to be volatile, and testing against them results in high
> maintenance costs.
> * Input files checked in to version control. They are difficult to
> maintain. Generate input files with code instead.
> * Expected output files checked in to VC. Same reason. Validate
> selected fields instead.
>
> For a longer answer, please search for my previous posts to the user
> list, or watch this presentation: https://vimeo.com/192429554
>
> Slides at http://www.slideshare.net/lallea/test-strategies-for-
> data-processing-pipelines-67244458
>
>
> Regards,
>
>
>
> Lars Albertsson
> Data engineering consultant
> www.mapflat.com
> https://twitter.com/lalleal
> +46 70 7687109
> Calendar: https://goo.gl/6FBtlS, https://freebusy.io/la...@mapflat.com
>
>
> On Sun, Jan 15, 2017 at 7:14 PM, A Shaikh  wrote:
> > Whats the most popular Testing approach for Spark App. I am looking
> > something in the line of TDD.
>