Re: PySpark 1.6.1: 'builtin_function_or_method' object has no attribute '__code__' in Pickles

2016-07-29 Thread Bhaarat Sharma
I'm very new to Spark. Im running it on a single CentOS7 box. How would I
add a test.py to spark submit? Point to any resources would be great.
Thanks for your help.

On Sat, Jul 30, 2016 at 1:28 AM, ayan guha  wrote:

> I think you need to add test.py in spark submit so that it gets shipped to
> all executors
>
> On Sat, Jul 30, 2016 at 3:24 PM, Bhaarat Sharma 
> wrote:
>
>> I am using PySpark 1.6.1. In my python program I'm using ctypes and
>> trying to load the liblept library via the liblept.so.4.0.2 file on my
>> system.
>>
>> While trying to load the library via cdll.LoadLibrary("liblept.so.4.0.2")
>> I get an error : 'builtin_function_or_method' object has no attribute
>> '__code__'
>>
>> Here are my files
>>
>> test.py
>>
>> from ctypes import *
>>
>> class FooBar:
>> def __init__(self, options=None, **kwargs):
>> if options is not None:
>> self.options = options
>>
>> def read_image_from_bytes(self, bytes):
>> return "img"
>>
>> def text_from_image(self, img):
>> self.leptonica = cdll.LoadLibrary("liblept.so.4.0.2")
>> return "test from foobar"
>>
>>
>> spark.py
>>
>> from pyspark import SparkContext
>> import test
>> import numpy as np
>> sc = SparkContext("local", "test")
>> foo = test.FooBar()
>>
>> def file_bytes(rawdata):
>> return np.asarray(bytearray(rawdata),dtype=np.uint8)
>>
>> def do_some_with_bytes(bytes):
>> return foo.do_something_on_image(foo.read_image_from_bytes(bytes))
>>
>> images = sc.binaryFiles("/myimages/*.jpg")
>> image_to_text = lambda rawdata: do_some_with_bytes(file_bytes(rawdata))
>> print images.values().map(image_to_text).take(1) #this gives an error
>>
>>
>> What is the way to load this library?
>>
>>
>
>
> --
> Best Regards,
> Ayan Guha
>


Re: PySpark 1.6.1: 'builtin_function_or_method' object has no attribute '__code__' in Pickles

2016-07-29 Thread ayan guha
I think you need to add test.py in spark submit so that it gets shipped to
all executors

On Sat, Jul 30, 2016 at 3:24 PM, Bhaarat Sharma  wrote:

> I am using PySpark 1.6.1. In my python program I'm using ctypes and trying
> to load the liblept library via the liblept.so.4.0.2 file on my system.
>
> While trying to load the library via cdll.LoadLibrary("liblept.so.4.0.2")
> I get an error : 'builtin_function_or_method' object has no attribute
> '__code__'
>
> Here are my files
>
> test.py
>
> from ctypes import *
>
> class FooBar:
> def __init__(self, options=None, **kwargs):
> if options is not None:
> self.options = options
>
> def read_image_from_bytes(self, bytes):
> return "img"
>
> def text_from_image(self, img):
> self.leptonica = cdll.LoadLibrary("liblept.so.4.0.2")
> return "test from foobar"
>
>
> spark.py
>
> from pyspark import SparkContext
> import test
> import numpy as np
> sc = SparkContext("local", "test")
> foo = test.FooBar()
>
> def file_bytes(rawdata):
> return np.asarray(bytearray(rawdata),dtype=np.uint8)
>
> def do_some_with_bytes(bytes):
> return foo.do_something_on_image(foo.read_image_from_bytes(bytes))
>
> images = sc.binaryFiles("/myimages/*.jpg")
> image_to_text = lambda rawdata: do_some_with_bytes(file_bytes(rawdata))
> print images.values().map(image_to_text).take(1) #this gives an error
>
>
> What is the way to load this library?
>
>


-- 
Best Regards,
Ayan Guha


Re: Java Recipes for Spark

2016-07-29 Thread ayan guha
Hi

Is there anything similar with Python? Else I can create one.

On Sat, Jul 30, 2016 at 2:19 PM, Shiva Ramagopal  wrote:

> +1 for the Java love :-)
>
> On 30-Jul-2016 4:39 AM, "Renato Perini"  wrote:
>
>> Not only very useful, but finally some Java love :-)
>>
>> Thank you.
>>
>>
>> Il 29/07/2016 22:30, Jean Georges Perrin ha scritto:
>>
>>> Sorry if this looks like a shameless self promotion, but some of you
>>> asked me to say when I'll have my Java recipes for Apache Spark updated.
>>> It's done here: http://jgp.net/2016/07/22/spark-java-recipes/ and in
>>> the GitHub repo.
>>>
>>> Enjoy / have a great week-end.
>>>
>>> jg
>>>
>>>
>>>
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>


-- 
Best Regards,
Ayan Guha


PySpark 1.6.1: 'builtin_function_or_method' object has no attribute '__code__' in Pickles

2016-07-29 Thread Bhaarat Sharma
I am using PySpark 1.6.1. In my python program I'm using ctypes and trying
to load the liblept library via the liblept.so.4.0.2 file on my system.

While trying to load the library via cdll.LoadLibrary("liblept.so.4.0.2") I
get an error : 'builtin_function_or_method' object has no attribute
'__code__'

Here are my files

test.py

from ctypes import *

class FooBar:
def __init__(self, options=None, **kwargs):
if options is not None:
self.options = options

def read_image_from_bytes(self, bytes):
return "img"

def text_from_image(self, img):
self.leptonica = cdll.LoadLibrary("liblept.so.4.0.2")
return "test from foobar"


spark.py

from pyspark import SparkContext
import test
import numpy as np
sc = SparkContext("local", "test")
foo = test.FooBar()

def file_bytes(rawdata):
return np.asarray(bytearray(rawdata),dtype=np.uint8)

def do_some_with_bytes(bytes):
return foo.do_something_on_image(foo.read_image_from_bytes(bytes))

images = sc.binaryFiles("/myimages/*.jpg")
image_to_text = lambda rawdata: do_some_with_bytes(file_bytes(rawdata))
print images.values().map(image_to_text).take(1) #this gives an error


What is the way to load this library?


Re: Java Recipes for Spark

2016-07-29 Thread Shiva Ramagopal
+1 for the Java love :-)

On 30-Jul-2016 4:39 AM, "Renato Perini"  wrote:

> Not only very useful, but finally some Java love :-)
>
> Thank you.
>
>
> Il 29/07/2016 22:30, Jean Georges Perrin ha scritto:
>
>> Sorry if this looks like a shameless self promotion, but some of you
>> asked me to say when I'll have my Java recipes for Apache Spark updated.
>> It's done here: http://jgp.net/2016/07/22/spark-java-recipes/ and in the
>> GitHub repo.
>>
>> Enjoy / have a great week-end.
>>
>> jg
>>
>>
>>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


sql to spark scala rdd

2016-07-29 Thread kali.tumm...@gmail.com
Hi All, 

I managed to write business requirement in spark-sql and hive I am still
learning scala how this below sql be written using spark RDD not spark data
frames.

SELECT DATE,balance,
SUM(balance) OVER (ORDER BY DATE ROWS BETWEEN UNBOUNDED PRECEDING AND
CURRENT ROW) daily_balance
FROM  table





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/sql-to-spark-scala-rdd-tp27433.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Java Recipes for Spark

2016-07-29 Thread Renato Perini

Not only very useful, but finally some Java love :-)

Thank you.


Il 29/07/2016 22:30, Jean Georges Perrin ha scritto:
Sorry if this looks like a shameless self promotion, but some of you 
asked me to say when I'll have my Java recipes for Apache Spark 
updated. It's done here: 
http://jgp.net/2016/07/22/spark-java-recipes/ and in the GitHub repo.


Enjoy / have a great week-end.

jg





-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Spark 1.6.1 Workaround: Properly handle signal kill of ApplicationMaster

2016-07-29 Thread jatinder85
https://issues.apache.org/jira/browse/SPARK-13642

Does anybody know reliable workaround on this issue in 1.6.1?


Thanks,
Jatinder



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-6-1-Workaround-Properly-handle-signal-kill-of-ApplicationMaster-tp27431.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Problems initializing SparkUI

2016-07-29 Thread Mich Talebzadeh
why chance it. Best to explicitly specify in spark-submit (or whatever)
which port to listen to

 --conf "spark.ui.port=nnn"

and see if it works

HTH


Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 29 July 2016 at 23:37, Jacek Laskowski  wrote:

> Hi,
>
> I'm curious about "For some reason, sometimes the SparkUI does not
> appear to be bound on port 4040 (or any other) but the application
> runs perfectly and finishes giving the expected answer." How do you
> check that web UI listens to the port 4040?
>
> Pozdrawiam,
> Jacek Laskowski
> 
> https://medium.com/@jaceklaskowski/
> Mastering Apache Spark http://bit.ly/mastering-apache-spark
> Follow me at https://twitter.com/jaceklaskowski
>
>
> On Thu, Jul 28, 2016 at 11:37 PM, Maximiliano Patricio Méndez
>  wrote:
> > Hi,
> >
> > I'm having some trouble trying to submit an application to my spark
> cluster.
> > For some reason, sometimes the SparkUI does not appear to be bound on
> port
> > 4040 (or any other) but the application runs perfectly and finishes
> giving
> > the expected answer.
> >
> > And don't know why, but if I restart all the workers at once sometimes it
> > begins to work and sometimes it doesn't.
> >
> > In the driver logs, when it fails to start the SparkUI I see some these
> > lines:
> > 16/07/28 16:13:37 INFO Utils: Successfully started service 'SparkUI' on
> port
> > 4040.
> > 16/07/28 16:13:37 INFO SparkUI: Started SparkUI at
> http://hostname-00:4040
> >
> > but nothing running in those ports.
> >
> > I'm attaching the full driver log in which I've activated jetty logs on
> > DEBUG but couldn't find anything.
> >
> > The only properties that I'm not leaving at default at the configuration
> is
> > the SPARK_PUBLIC_DNS=$(hostname), SPARK_WORKER_CORES and
> SPARK_WORKER_MEMORY
> >
> > Have anyone faced something similar?
> >
> > Thanks
> >
> >
> > -
> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Problems initializing SparkUI

2016-07-29 Thread Jacek Laskowski
Hi,

I'm curious about "For some reason, sometimes the SparkUI does not
appear to be bound on port 4040 (or any other) but the application
runs perfectly and finishes giving the expected answer." How do you
check that web UI listens to the port 4040?

Pozdrawiam,
Jacek Laskowski

https://medium.com/@jaceklaskowski/
Mastering Apache Spark http://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski


On Thu, Jul 28, 2016 at 11:37 PM, Maximiliano Patricio Méndez
 wrote:
> Hi,
>
> I'm having some trouble trying to submit an application to my spark cluster.
> For some reason, sometimes the SparkUI does not appear to be bound on port
> 4040 (or any other) but the application runs perfectly and finishes giving
> the expected answer.
>
> And don't know why, but if I restart all the workers at once sometimes it
> begins to work and sometimes it doesn't.
>
> In the driver logs, when it fails to start the SparkUI I see some these
> lines:
> 16/07/28 16:13:37 INFO Utils: Successfully started service 'SparkUI' on port
> 4040.
> 16/07/28 16:13:37 INFO SparkUI: Started SparkUI at http://hostname-00:4040
>
> but nothing running in those ports.
>
> I'm attaching the full driver log in which I've activated jetty logs on
> DEBUG but couldn't find anything.
>
> The only properties that I'm not leaving at default at the configuration is
> the SPARK_PUBLIC_DNS=$(hostname), SPARK_WORKER_CORES and SPARK_WORKER_MEMORY
>
> Have anyone faced something similar?
>
> Thanks
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Java Recipes for Spark

2016-07-29 Thread Gavin Yue
This is useful:) 

Thank you for sharing. 



> On Jul 29, 2016, at 1:30 PM, Jean Georges Perrin  wrote:
> 
> Sorry if this looks like a shameless self promotion, but some of you asked me 
> to say when I'll have my Java recipes for Apache Spark updated. It's done 
> here: http://jgp.net/2016/07/22/spark-java-recipes/ and in the GitHub repo. 
> 
> Enjoy / have a great week-end.
> 
> jg
> 
> 


Java Recipes for Spark

2016-07-29 Thread Jean Georges Perrin
Sorry if this looks like a shameless self promotion, but some of you asked me 
to say when I'll have my Java recipes for Apache Spark updated. It's done here: 
http://jgp.net/2016/07/22/spark-java-recipes/ 
 and in the GitHub repo. 

Enjoy / have a great week-end.

jg




use big files and read from HDFS was: performance problem when reading lots of small files created by spark streaming.

2016-07-29 Thread Andy Davidson
Hi Pedro

I did some experiments. I  using one of our relatively small data set. The
data set is loaded into 3 or 4 data frames. I then call count()


Looks like using bigger files and reading from HDFS is a good solution for
reading data. I guess I¹ll need to do something similar to this to deal with
S3 write performance

I think this could probably be tuned up a bit. I randomly choose a max 30
partitions for each data frame. When I combined files I checked the the
combined file size was < 64mb (64,000,000) how ever in practice the are
bigger

Andy


execution timesrcis coalescefile sizenum files
39min 44ss3Falsesmall270,518
32min 24ss330small270,518
3min 09sHDFS30small270,518
4min 24sHDFSFalsesmall270,518
2min 19sHDFSFalsebig001,046
2min 06sHDFS30big001,046


From:  Andrew Davidson 
Date:  Thursday, July 28, 2016 at 8:58 AM
To:  Pedro Rodriguez 
Cc:  "user @spark" 
Subject:  Re: performance problem when reading lots of small files created
by spark streaming.

> Hi Pedro
> 
> Thanks for the explanation. I started watching your repo. In the short term I
> think I am going to try concatenating my small files into 64MB and using HDFS.
> My spark streaming app is implemented Java and uses data frames. It writes to
> s3. My batch processing is written in python It reads data into data frames.
> 
> Its probably a lot of work to make your solution working in these other
> contexts.
> 
> Here is another use case you might be interested in
> Writing multiple files to S3 is really slow. It causes a lot of problems for
> my streaming app. Bad things happen if your processing time exceeds your
> window length. Our streaming app must save all the input. For each mini batch
> we split the input into as many as 30 different data sets. Each one needs to
> be written to S3.
> 
> As a temporary work around I use an executor service to try and get more
> concurrent writes. Ideally the spark frame work would provide support for
> async IO, and hopefully the S3 performance issue would be improved. Here is my
> code if you are interested
> 
> 
> public class StreamingKafkaGnipCollector {
> 
> static final int POOL_SIZE = 30;
> 
> static ExecutorService executor = Executors.newFixedThreadPool(POOL_SIZE);
> 
> 
> Š
> 
> private static void saveRawInput(SQLContext sqlContext,
> JavaPairInputDStream messages, String outputURIBase) {
> 
> JavaDStream lines = messages.map(new Function,
> String>() {
> 
> private static final long serialVersionUID = 1L;
> 
> 
> 
> @Override
> 
> public String call(Tuple2 tuple2) {
> 
> //logger.warn("TODO _2:{}", tuple2._2);
> 
> return tuple2._2();
> 
> }
> 
> });
> 
> 
> 
> lines.foreachRDD(new VoidFunction2() {
> 
> @Override
> 
> public void call(JavaRDD jsonRDD, Time time) throws Exception {
> 
> Š
> // df.write().json("s3://"); is very slow
> 
> // run saves concurrently
> 
> List saveData = new ArrayList(100);
> 
> for (String tag: tags) {
> 
> DataFrame saveDF = activityDF.filter(activityDF.col(tagCol).equalTo(tag));
> 
> String dirPath = createPath(outputURIBase, date, tag, milliSeconds);
> 
> saveData.add(new SaveData(saveDF, dirPath));
> 
> }
> 
> 
> 
> saveImpl(saveData, executor); // concurrent writes to S3
> 
> }
> 
> private void saveImpl(List saveData, ExecutorService executor) {
> 
> List runningThreads = new ArrayList(POOL_SIZE);
> 
> for(SaveData data : saveData) {
> 
> SaveWorker worker = new SaveWorker(data);
> 
> Future f = executor.submit(worker);
> 
> runningThreads.add(f);
> 
> }
> 
> // wait for all the workers to complete
> 
> for (Future worker : runningThreads) {
> 
> try {
> 
> worker.get();
> 
> logger.debug("worker completed");
> 
> } catch (InterruptedException e) {
> 
> logger.error("", e);
> 
> } catch (ExecutionException e) {
> 
> logger.error("", e);
> 
> }
> 
> } 
> 
> }
> 
> 
> 
> static class SaveData {
> 
> private DataFrame df;
> 
> private String path;
> 
> 
> 
> SaveData(DataFrame df, String path) {
> 
> this.df = df;
> 
> this.path = path;
> 
> }
> 
> }
> 
> static class SaveWorker implements Runnable {
> 
> SaveData data;
> 
> 
> 
> public SaveWorker(SaveData data) {
> 
> this.data = data;
> 
> }
> 
> 
> 
> @Override
> 
> public void run() {
> 
> if (data.df.count() >= 1) {
> 
> data.df.write().json(data.path);
> 
> }
> 
> }
> 
> }
> 
> }
> 
> 
> 
> From:  Pedro Rodriguez 
> Date:  Wednesday, July 27, 2016 at 8:40 PM
> To:  Andrew Davidson 
> Cc:  "user @spark" 
> Subject:  Re: performance problem when reading lots of small files created by
> spark streaming.
> 
>> There are a few blog posts that detail one possible/likely issue for example:
>> http://tech.kinja.com/how-not-to-pull-from-s3-using-apache-spark-1704509219
>> 
>> TLDR: The hadoop libraries spark uses assumes that its input comes from a
>> file 

Spark 1.6.1 Workaround: Properly handle signal kill of ApplicationMaster

2016-07-29 Thread Jatinder Assi
https://issues.apache.org/jira/browse/SPARK-13642

Does anybody know reliable workaround on this issue in 1.6.1?


Thanks,
Jatinder


pyspark 1.6.1 `partitionBy` does not provide meaningful information for `join` to use

2016-07-29 Thread Sisyphuss
import numpy as np

def id(x):
return x

rdd = sc.parallelize(np.arange(1000))
rdd = rdd.map(lambda x: (x,1))
rdd = rdd.partitionBy(8, id)
rdd = rdd.cache().setName('milestone')
rdd.join(rdd).collect()

The above code generates this DAG:

 
Zoom in Stage 13:

 
Zoom in Stage 14:

 


The green box is cached 'milestone'. Normally, it should contain partition
information.
However, there is still shuffling in `join()`.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/pyspark-1-6-1-partitionBy-does-not-provide-meaningful-information-for-join-to-use-tp27429.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



multiple SPARK_LOCAL_DIRS causing strange behavior in parallelism

2016-07-29 Thread Saif.A.Ellafi
Hi all,

I was currently playing around with spark-env around SPARK_LOCAL_DIRS in order 
to add additional shuffle storage.

But since I did this, I am getting too many open files error if total executor 
cores is high. I am also getting low parallelism, by monitoring the running 
tasks on some big job, most tasks run on the driver host, and very limited in 
other nodes, while using ANY locality.

Generally speaking, Could I be doing anything wrong regarding this setting?

I am setting on each node, local different phyisical hard drives to store 
shuffle information. Returning this configuration to a single folder storage on 
each node, everything runs normally

Thanks,
Saif



RE: HBase-Spark Module

2016-07-29 Thread David Newberger
Hi Ben,

This seems more like a question for community.cloudera.com. However, it would 
be in hbase not spark I believe. 

https://repository.cloudera.com/artifactory/webapp/#/artifacts/browse/tree/General/cloudera-release-repo/org/apache/hbase/hbase-spark

David Newberger


-Original Message-
From: Benjamin Kim [mailto:bbuil...@gmail.com] 
Sent: Friday, July 29, 2016 12:57 PM
To: user@spark.apache.org
Subject: HBase-Spark Module

I would like to know if anyone has tried using the hbase-spark module? I tried 
to follow the examples in conjunction with CDH 5.8.0. I cannot find the 
HBaseTableCatalog class in the module or in any of the Spark jars. Can someone 
help?

Thanks,
Ben
-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: The main difference use case between orderBY and sort

2016-07-29 Thread Mich Talebzadeh
Within the realm of ANSI SQL there is ORDER BY but no SORT BY.

ORDERR BY sorts the result set in ascending or descending order.  In SQL
sorting is the term and ORDER BY is part of the syntax.

In map-reduce pragma for example in Hive QL, SORT BY sorts data per
reducer. As I understand the difference between ORDER BY and SORT BY is
that ORDER BY guarantees total order in the output while SORT BY only
guarantees ordering of the rows within a reducer. If there are more than
one reducer, SORT BY may give partially ordered final results.

hive> select prod_id, quantity_sold, amount_sold from sales *sort by*
quantity_sold asc, amount_sold desc

compared to

hive> select prod_id, quantity_sold, amount_sold from sales *order by*
quantity_sold asc, amount_sold desc


Personally, I stick to order by

HTH


Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 29 July 2016 at 17:49, Daniel Santana  wrote:

> As far as I know *sort* is just an alias of *orderBy* (or vice-versa)
>
> And your last operation is taking longer because you are sorting it twice.
>
> --
> *Daniel Santana*
> Senior Software Engineer
>
> EVERY*MUNDO*
> 25 SE 2nd Ave., Suite 900
> Miami, FL 33131 USA
> main:+1 (305) 375-0045
> EveryMundo.com 
>
> *Confidentiality Notice: *This email and any files transmitted with it
> are confidential and intended solely for the use of the individual or
> entity to whom they are addressed. If you have received this email in
> error, please notify the system manager.
>
> On Fri, Jul 29, 2016 at 12:20 PM, Ashok Kumar <
> ashok34...@yahoo.com.invalid> wrote:
>
>> Hi,
>>
>> In Spark programing I can use
>>
>> df.filter(col("transactiontype") ===
>> "DEB").groupBy("transactiondate").agg(sum("debitamount").cast("Float").as("Total
>> Debit Card")).orderBy("transactiondate").show(5)
>>
>> or
>>
>> df.filter(col("transactiontype") ===
>> "DEB").groupBy("transactiondate").agg(sum("debitamount").cast("Float").as("Total
>> Debit Card")).sort("transactiondate").show(5)
>>
>> i get the same results
>>
>> and i can use both as well
>>
>> df.ilter(col("transactiontype") ===
>> "DEB").groupBy("transactiondate").agg(sum("debitamount").cast("Float").as("Total
>> Debit Card")).orderBy("transactiondate").sort("transactiondate").show(5)
>>
>> but the last one takes more time.
>>
>> what is the use case for both these please. does it make sense to use
>> both?
>>
>> Thanks
>>
>
>


HBase-Spark Module

2016-07-29 Thread Benjamin Kim
I would like to know if anyone has tried using the hbase-spark module? I tried 
to follow the examples in conjunction with CDH 5.8.0. I cannot find the 
HBaseTableCatalog class in the module or in any of the Spark jars. Can someone 
help?

Thanks,
Ben
-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: The main difference use case between orderBY and sort

2016-07-29 Thread Daniel Santana
As far as I know *sort* is just an alias of *orderBy* (or vice-versa)

And your last operation is taking longer because you are sorting it twice.

-- 
*Daniel Santana*
Senior Software Engineer

EVERY*MUNDO*
25 SE 2nd Ave., Suite 900
Miami, FL 33131 USA
main:+1 (305) 375-0045
EveryMundo.com 

*Confidentiality Notice: *This email and any files transmitted with it are
confidential and intended solely for the use of the individual or entity to
whom they are addressed. If you have received this email in error, please
notify the system manager.

On Fri, Jul 29, 2016 at 12:20 PM, Ashok Kumar 
wrote:

> Hi,
>
> In Spark programing I can use
>
> df.filter(col("transactiontype") ===
> "DEB").groupBy("transactiondate").agg(sum("debitamount").cast("Float").as("Total
> Debit Card")).orderBy("transactiondate").show(5)
>
> or
>
> df.filter(col("transactiontype") ===
> "DEB").groupBy("transactiondate").agg(sum("debitamount").cast("Float").as("Total
> Debit Card")).sort("transactiondate").show(5)
>
> i get the same results
>
> and i can use both as well
>
> df.ilter(col("transactiontype") ===
> "DEB").groupBy("transactiondate").agg(sum("debitamount").cast("Float").as("Total
> Debit Card")).orderBy("transactiondate").sort("transactiondate").show(5)
>
> but the last one takes more time.
>
> what is the use case for both these please. does it make sense to use both?
>
> Thanks
>


Re: sampling operation for DStream

2016-07-29 Thread Cody Koeninger
Most stream systems you're still going to incur the cost of reading
each message... I suppose you could rotate among reading just the
latest messages from a single partition of a Kafka topic if they were
evenly balanced.

But once you've read the messages, nothing's stopping you from
filtering most of them out before doing further processing.  The
dstream .transform method will let you do any filtering / sampling you
could have done on an rdd.

On Fri, Jul 29, 2016 at 9:57 AM, Martin Le  wrote:
> Hi all,
>
> I have to handle high-speed rate data stream. To reduce the heavy load, I
> want to use sampling techniques for each stream window. It means that I want
> to process a subset of data instead of whole window data. I saw Spark
> support sampling operations for RDD, but for DStream, Spark supports
> sampling operation as well? If not,  could you please give me a suggestion
> how to implement it?
>
> Thanks,
> Martin

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



The main difference use case between orderBY and sort

2016-07-29 Thread Ashok Kumar
Hi,
In Spark programing I can use
df.filter(col("transactiontype") === 
"DEB").groupBy("transactiondate").agg(sum("debitamount").cast("Float").as("Total
 Debit Card")).orderBy("transactiondate").show(5)
or
df.filter(col("transactiontype") === 
"DEB").groupBy("transactiondate").agg(sum("debitamount").cast("Float").as("Total
 Debit Card")).sort("transactiondate").show(5)

i get the same results
and i can use both as well
df.ilter(col("transactiontype") === 
"DEB").groupBy("transactiondate").agg(sum("debitamount").cast("Float").as("Total
 Debit Card")).orderBy("transactiondate").sort("transactiondate").show(5)

but the last one takes more time.
what is the use case for both these please. does it make sense to use both?
Thanks

Tuning level of Parallelism: Increase or decrease?

2016-07-29 Thread Jestin Ma
I am processing ~2 TB of hdfs data using DataFrames. The size of a task is
equal to the block size specified by hdfs, which happens to be 128 MB,
leading to about 15000 tasks.

I'm using 5 worker nodes with 16 cores each and ~25 GB RAM.
I'm performing groupBy, count, and an outer-join with another DataFrame of
~200 MB size (~80 MB cached but I don't need to cache it), then saving to
disk.

Right now it takes about 55 minutes, and I've been trying to tune it.

I read on the Spark Tuning guide that:
*In general, we recommend 2-3 tasks per CPU core in your cluster.*

This means that I should have about 30-50 tasks instead of 15000, and each
task would be much bigger in size. Is my understanding correct, and is this
suggested? I've read from difference sources to decrease or increase
parallelism, or even keep it default.

Thank you for your help,
Jestin


sampling operation for DStream

2016-07-29 Thread Martin Le
Hi all,

I have to handle high-speed rate data stream. To reduce the heavy load, I
want to use sampling techniques for each stream window. It means that I
want to process a subset of data instead of whole window data. I saw Spark
support sampling operations for RDD, but for DStream, Spark supports
sampling operation as well? If not,  could you please give me a suggestion
how to implement it?

Thanks,
Martin


Re: how to save spark files as parquets efficiently

2016-07-29 Thread Sumit Khanna
Great! Common sense is very uncommon.

On Fri, Jul 29, 2016 at 8:26 PM, Ewan Leith 
wrote:

> If you replace the df.write ….
>
>
>
> With
>
>
>
> df.count()
>
>
>
> in your code you’ll see how much time is taken to process the full
> execution plan without the write output.
>
>
>
> That code below looks perfectly normal for writing a parquet file yes,
> there shouldn’t be any tuning needed for “normal” performance.
>
>
>
> Thanks,
>
> Ewan
>
>
>
> *From:* Sumit Khanna [mailto:sumit.kha...@askme.in]
> *Sent:* 29 July 2016 13:41
> *To:* Gourav Sengupta 
> *Cc:* user 
> *Subject:* Re: how to save spark files as parquets efficiently
>
>
>
> Hey Gourav,
>
>
>
> Well so I think that it is my execution plan that is at fault. So
> basically df.write as a spark job on localhost:4040/ well being an action
> will include the time taken for all the umpteen transformation on it right?
> All I wanted to know is "what apt env/config params are needed to something
> simple read a dataframe from parquet and save it back as another parquet
> (meaning vanilla load/store no transformation). Is it good enough to simply
> read. and write. in the very format mentioned in spark tutorial docs i.e
>
>
>
> *df.write.format("parquet").mode("overwrite").save(hdfspathTemp) *??
>
>
>
> Thanks,
>
>
>
> On Fri, Jul 29, 2016 at 4:22 PM, Gourav Sengupta <
> gourav.sengu...@gmail.com> wrote:
>
> Hi,
>
>
> The default write format in SPARK is parquet. And I have never faced any
> issues writing over a billion records in SPARK. Are you using
> virtualization by any chance or an obsolete hard disk or Intel Celeron may
> be?
>
> Regards,
>
> Gourav Sengupta
>
>
>
> On Fri, Jul 29, 2016 at 7:27 AM, Sumit Khanna 
> wrote:
>
> Hey,
>
>
>
> master=yarn
>
> mode=cluster
>
>
>
> spark.executor.memory=8g
>
> spark.rpc.netty.dispatcher.numThreads=2
>
>
>
> All the POC on a single node cluster. the biggest bottle neck being :
>
>
>
> 1.8 hrs to save 500k records as a parquet file/dir executing this command :
>
>
>
> *df.write.format("parquet").mode("overwrite").save(hdfspathTemp)*
>
>
>
> No doubt, the whole execution plan gets triggered on this write / save
> action. But is it the right command / set of params to save a dataframe?
>
>
>
> essentially I am doing an upsert by pulling in data from hdfs and then
> updating it with the delta changes of the current run. But not sure if
> write itself takes that much time or some optimization is needed for
> upsert. (I have that asked as another question altogether).
>
>
>
> Thanks,
>
> Sumit
>
>
>
>
>
>
>


RE: how to save spark files as parquets efficiently

2016-07-29 Thread Ewan Leith
If you replace the df.write ….

With

df.count()

in your code you’ll see how much time is taken to process the full execution 
plan without the write output.

That code below looks perfectly normal for writing a parquet file yes, there 
shouldn’t be any tuning needed for “normal” performance.

Thanks,
Ewan

From: Sumit Khanna [mailto:sumit.kha...@askme.in]
Sent: 29 July 2016 13:41
To: Gourav Sengupta 
Cc: user 
Subject: Re: how to save spark files as parquets efficiently

Hey Gourav,

Well so I think that it is my execution plan that is at fault. So basically 
df.write as a spark job on localhost:4040/ well being an action will include 
the time taken for all the umpteen transformation on it right? All I wanted to 
know is "what apt env/config params are needed to something simple read a 
dataframe from parquet and save it back as another parquet (meaning vanilla 
load/store no transformation). Is it good enough to simply read. and write. in 
the very format mentioned in spark tutorial docs i.e

df.write.format("parquet").mode("overwrite").save(hdfspathTemp) ??

Thanks,

On Fri, Jul 29, 2016 at 4:22 PM, Gourav Sengupta 
> wrote:
Hi,

The default write format in SPARK is parquet. And I have never faced any issues 
writing over a billion records in SPARK. Are you using virtualization by any 
chance or an obsolete hard disk or Intel Celeron may be?
Regards,
Gourav Sengupta

On Fri, Jul 29, 2016 at 7:27 AM, Sumit Khanna 
> wrote:
Hey,

master=yarn
mode=cluster

spark.executor.memory=8g
spark.rpc.netty.dispatcher.numThreads=2

All the POC on a single node cluster. the biggest bottle neck being :

1.8 hrs to save 500k records as a parquet file/dir executing this command :


df.write.format("parquet").mode("overwrite").save(hdfspathTemp)

No doubt, the whole execution plan gets triggered on this write / save action. 
But is it the right command / set of params to save a dataframe?

essentially I am doing an upsert by pulling in data from hdfs and then updating 
it with the delta changes of the current run. But not sure if write itself 
takes that much time or some optimization is needed for upsert. (I have that 
asked as another question altogether).

Thanks,
Sumit





Re: correct / efficient manner to upsert / update in hdfs (via spark / in general)

2016-07-29 Thread ayan guha
Thanks Sumit, please post back how your test with Hbase go.



On Fri, Jul 29, 2016 at 8:06 PM, Sumit Khanna  wrote:

> Hey Ayan,
>
> A. Create a table TGT1 as (select key,info from delta UNION ALL select
> key,info from TGT where key not in (select key from SRC)). Rename TGT1 to
> TGT. Not in can be written other variations using Outer Join
> B. Assuming SRC and TGT have a timestamp,
>   B.1. Select latest records from UNION ALL(SRC,TGT) using RANK()
> OVER PARTITION BY (Key order by timestamp desc)
>   B.2. Create TGT1 from B.1. Rename TGT1 to TGT2
>
> Well how we approached this was to broadcast the primary keys, since they
> say is better because a smaller table (we make sure that our run frequency
> is shrunk enlarged based on traffic somehow) so much so that the
> cardinality | unique delta primary keys | is a small and broadcastable
> number indeed. Then what follows next is a filter function on each executor
> which has the keys to be upserted against , all with them(I believe in
> memory, broadcast writes the keys in executor memory isnt it ? ). As in,
> that was the only optimization I could think of. with option A, as well as
> B, there are likely to be huge shuffle costs (shuffleHashJoin)s right?
>
> 1.  if updates are fairly spred across keys, the scheme does not give much
> benefit as number of partition read ~= total number of partition.
> 2.  This scheme often shows long tail problem (Think 1 key changed in a
> partition).
>
> 1. is beyond doubt true, because my any column key back in time/partition
> space may get updated in the next run. So is 2, as in we make the entire
> partition pass through the filter for only updating 1 or 2-3 affected keys.
>
> I do not think with the current use case if I can ensure that keys get
> partitioned well and delta corresponds to just one partition, that will
> happen if I only and only maintain the date-wise partitions and some
> concept of recency is observed. Let me see how HBase might efficiently
> tackle this classic upsert case.
>
> Thanks,
> Sumit
>
> On Fri, Jul 29, 2016 at 3:22 PM, ayan guha  wrote:
>
>> This is a classic case compared to hadoop vs DWH implmentation.
>>
>> Source (Delta table): SRC. Target: TGT
>>
>> Requirement: Pure Upsert, ie just keep the latest information for each
>> key.
>>
>> Options:
>>
>> A. Create a table TGT1 as (select key,info from delta UNION ALL select
>> key,info from TGT where key not in (select key from SRC)). Rename TGT1 to
>> TGT. Not in can be written other variations using Outer Join
>> B. Assuming SRC and TGT have a timestamp,
>>   B.1. Select latest records from UNION ALL(SRC,TGT) using RANK()
>> OVER PARTITION BY (Key order by timestamp desc)
>>   B.2. Create TGT1 from B.1. Rename TGT1 to TGT2
>>
>> Both options are costly. And essentially more effort can be introduced to
>> write complex manipulations by partitioning data based on key and read only
>> partitions which are "changed". 3 issues:
>> 1.  if updates are fairly spred across keys, the scheme does not give
>> much benefit as number of partition read ~= total number of partition.
>> 2.  This scheme often shows long tail problem (Think 1 key changed in a
>> partition).
>>
>> This may be good when partition is based on keys and keys increase
>> monotonically. This adds maintenance of adding more partitions but do well
>> well to contain number of partitions read.
>>
>> My advise: Give HBase a shot. It gives UPSERT out of box. If you want
>> history, just add timestamp in the key (in reverse). Computation engines
>> easily support HBase.
>>
>> Best
>> Ayan
>>
>> On Fri, Jul 29, 2016 at 5:03 PM, Sumit Khanna 
>> wrote:
>>
>>> Just a note, I had the delta_df keys for the filter as in NOT
>>> INTERSECTION udf broadcasted to all the worker nodes. Which I think is an
>>> efficient move enough.
>>>
>>> Thanks,
>>>
>>> On Fri, Jul 29, 2016 at 12:19 PM, Sumit Khanna 
>>> wrote:
>>>
 Hey,

 the very first run :

 glossary :

 delta_df := current run / execution changes dataframe.

 def deduplicate :
 apply windowing function and group by

 def partitionDataframe(delta_df) :
 get unique keys of that data frame and then return an array of data
 frames each containing just that very same key as the column.
 this will give the above dataframe partitoned as say by date column or
 gender column or age group column etc etc.

 0. deduplicate(delta_df : delta_df [ with all unique primary  /
 deduplicating key column ]
 1. partitionDataframe(delta_df) : Array[delta_df(i to # partitons)]
 2. write the dataframe to corresponding parent hdfs path + partiton dir_

 subsequent runs :

 for each partition :
 0. partitionDataframe(delta_df) : Array[delta_df(i to # partitons)]
 1. load df from previous hdfs location of that partition
 2. filter 

Re: tpcds for spark2.0

2016-07-29 Thread Olivier Girardot
I have the same kind of issue (not using spark-sql-perf), just trying to deploy
2.0.0 on mesos. I'll keep you posted as I investigate





On Wed, Jul 27, 2016 1:06 PM, kevin kiss.kevin...@gmail.com wrote:
hi,all: I want to have a test about tpcds99 sql run on spark2.0. I user 
https://github.com/databricks/spark-sql-perf
about the master version ,when I run :val tpcds = new TPCDS (sqlContext =
sqlContext) I got error:
scala> val tpcds = new TPCDS (sqlContext = sqlContext)
error: missing or invalid dependency detected while loading class file
'Benchmarkable.class'.
Could not access term typesafe in package com,
because it (or its dependencies) are missing. Check your build definition for
missing or conflicting dependencies. (Re-run with -Ylog-classpath to see the 
problematic classpath.)
A full rebuild may help if 'Benchmarkable.class' was compiled against an
incompatible version of com.
error: missing or invalid dependency detected while loading class file
'Benchmarkable.class'.
Could not access term scalalogging in value com.typesafe,
because it (or its dependencies) are missing. Check your build definition for
missing or conflicting dependencies. (Re-run with -Ylog-classpath to see the 
problematic classpath.)
A full rebuild may help if 'Benchmarkable.class' was compiled against an
incompatible version of com.typesafe.

about spark-sql-perf-0.4.3 when I run 
:tables.genData("hdfs://master1:9000/tpctest",
"parquet", true, false, false, false, false) I got error:
Generating table catalog_sales in database to
hdfs://master1:9000/tpctest/catalog_sales with save mode Overwrite. 16/07/27 
18:59:59 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0,
slave1): java.lang.ClassCastException: cannot assign instance of
scala.collection.immutable.List$SerializationProxy to field 
org.apache.spark.rdd.RDD.org $apache$spark$rdd$RDD$$dependencies_ of type 
scala.collection.Seq in instance
of org.apache.spark.rdd.MapPartitionsRDD


Olivier Girardot | Associé
o.girar...@lateral-thoughts.com
+33 6 24 09 17 94

Re: estimation of necessary time of execution

2016-07-29 Thread Mich Talebzadeh
hi,

what is that function in Hive as a matter of interest?

thanks

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 29 July 2016 at 13:08, pseudo oduesp  wrote:

> Hi,
>
> on hive we have a awosome function for estimation of time of execution
> before launch ?
>
> in spark can find any function to estimate the time of lineage of spark
> dag execution ?
>
> Thanks
>
>


Re: how to save spark files as parquets efficiently

2016-07-29 Thread Sumit Khanna
Hey Gourav,

Well so I think that it is my execution plan that is at fault. So basically
df.write as a spark job on localhost:4040/ well being an action will
include the time taken for all the umpteen transformation on it right? All
I wanted to know is "what apt env/config params are needed to something
simple read a dataframe from parquet and save it back as another parquet
(meaning vanilla load/store no transformation). Is it good enough to simply
read. and write. in the very format mentioned in spark tutorial docs i.e

df.write.format("parquet").mode("overwrite").save(hdfspathTemp) ??

Thanks,

On Fri, Jul 29, 2016 at 4:22 PM, Gourav Sengupta 
wrote:

> Hi,
>
> The default write format in SPARK is parquet. And I have never faced any
> issues writing over a billion records in SPARK. Are you using
> virtualization by any chance or an obsolete hard disk or Intel Celeron may
> be?
>
> Regards,
> Gourav Sengupta
>
> On Fri, Jul 29, 2016 at 7:27 AM, Sumit Khanna 
> wrote:
>
>> Hey,
>>
>> master=yarn
>> mode=cluster
>>
>> spark.executor.memory=8g
>> spark.rpc.netty.dispatcher.numThreads=2
>>
>> All the POC on a single node cluster. the biggest bottle neck being :
>>
>> 1.8 hrs to save 500k records as a parquet file/dir executing this command
>> :
>>
>> df.write.format("parquet").mode("overwrite").save(hdfspathTemp)
>>
>>
>> No doubt, the whole execution plan gets triggered on this write / save
>> action. But is it the right command / set of params to save a dataframe?
>>
>> essentially I am doing an upsert by pulling in data from hdfs and then
>> updating it with the delta changes of the current run. But not sure if
>> write itself takes that much time or some optimization is needed for
>> upsert. (I have that asked as another question altogether).
>>
>> Thanks,
>> Sumit
>>
>>
>


estimation of necessary time of execution

2016-07-29 Thread pseudo oduesp
Hi,

on hive we have a awosome function for estimation of time of execution
before launch ?

in spark can find any function to estimate the time of lineage of spark dag
execution ?

Thanks


Re: Spark 2.0 Build Failed

2016-07-29 Thread Ascot Moss
I think my maven is broken, I used another node in the cluster to compile
2.0.0 and got "successful"


[INFO]

[INFO] --- maven-source-plugin:2.4:jar-no-fork (create-source-jar) @
java8-tests_2.11 ---

[INFO]

[INFO] --- maven-source-plugin:2.4:test-jar-no-fork (create-source-jar) @
java8-tests_2.11 ---

[INFO]


[INFO] Reactor Summary:

[INFO]

[INFO] Spark Project Parent POM ... SUCCESS [
1.972 s]

[INFO] Spark Project Tags . SUCCESS [
3.280 s]

[INFO] Spark Project Sketch ... SUCCESS [
5.449 s]

[INFO] Spark Project Networking ... SUCCESS [
5.197 s]

[INFO] Spark Project Shuffle Streaming Service  SUCCESS [
3.816 s]

[INFO] Spark Project Unsafe ... SUCCESS [
8.275 s]

[INFO] Spark Project Launcher . SUCCESS [
5.229 s]

[INFO] Spark Project Core . SUCCESS [01:41
min]

[INFO] Spark Project GraphX ... SUCCESS [
25.111 s]

[INFO] Spark Project Streaming  SUCCESS [
54.290 s]

[INFO] Spark Project Catalyst . SUCCESS [01:15
min]

[INFO] Spark Project SQL .. SUCCESS [01:55
min]

[INFO] Spark Project ML Local Library . SUCCESS [
16.132 s]

[INFO] Spark Project ML Library ... SUCCESS [01:32
min]

[INFO] Spark Project Tools  SUCCESS [
5.136 s]

[INFO] Spark Project Hive . SUCCESS [
53.472 s]

[INFO] Spark Project REPL . SUCCESS [
11.716 s]

[INFO] Spark Project YARN Shuffle Service . SUCCESS [
4.102 s]

[INFO] Spark Project YARN . SUCCESS [
26.685 s]

[INFO] Spark Project Hive Thrift Server ... SUCCESS [
23.611 s]

[INFO] Spark Project Assembly . SUCCESS [
1.342 s]

[INFO] Spark Project External Flume Sink .. SUCCESS [
9.630 s]

[INFO] Spark Project External Flume ... SUCCESS [
15.323 s]

[INFO] Spark Project External Flume Assembly .. SUCCESS [
1.434 s]

[INFO] Spark Integration for Kafka 0.8  SUCCESS [
20.958 s]

[INFO] Spark Project Examples . SUCCESS [
22.080 s]

[INFO] Spark Project External Kafka Assembly .. SUCCESS [
2.421 s]

[INFO] Spark Integration for Kafka 0.10 ... SUCCESS [
16.255 s]

[INFO] Spark Integration for Kafka 0.10 Assembly .. SUCCESS [
2.578 s]

[INFO] Spark Project Java 8 Tests . SUCCESS [
5.573 s]

[INFO]


[INFO] BUILD SUCCESS

[INFO]


[INFO] Total time: 12:16 min

[INFO] Finished at: 2016-07-29T15:58:41+08:00

[INFO] Final Memory: 127M/4210M

[INFO]






I tried to reinstall my maven 3.3.9, deleted .m2, still got following error.

[ERROR] Plugin org.scalastyle:scalastyle-maven-plugin:0.8.0 or one of its
dependencies could not be resolved: Failed to read artifact descriptor for
org.scalastyle:scalastyle-maven-plugin:jar:0.8.0: Could not transfer
artifact org.scalastyle:scalastyle-maven-plugin:pom:0.8.0 from/to central (
https://repo1.maven.org/maven2): sun.security.validator.ValidatorException:
PKIX path building failed:
sun.security.provider.certpath.SunCertPathBuilderException: unable to find
valid certification path to requested target -> [Help 1]

[ERROR]

[ERROR] To see the full stack trace of the errors, re-run Maven with the -e
switch.

[ERROR] Re-run Maven using the -X switch to enable full debug logging.

[ERROR]

[ERROR] For more information about the errors and possible solutions,
please read the following articles:

[ERROR] [Help 1]
http://cwiki.apache.org/confluence/display/MAVEN/PluginResolutionException

On Fri, Jul 29, 2016 at 1:46 PM, Ascot Moss  wrote:

> I just run
>
> wget https://repo1.maven.org/maven2/org/apache/apache/14/apache-14.pom,
> can get it without issue.
>
> On Fri, Jul 29, 2016 at 1:44 PM, Ascot Moss  wrote:
>
>> Hi thanks!
>>
>> mvn dependency:tree
>>
>> [INFO] Scanning for projects...
>>
>> Downloading:
>> https://repo1.maven.org/maven2/org/apache/apache/14/apache-14.pom
>>
>> [ERROR] [ERROR] Some problems were encountered while processing the POMs:
>>
>> [FATAL] Non-resolvable parent POM for
>> org.apache.spark:spark-parent_2.11:2.0.0: Could not transfer artifact
>> org.apache:apache:pom:14 from/to central (https://repo1.maven.org/maven2):
>> sun.security.validator.ValidatorException: PKIX path building failed:
>> 

Re: how to save spark files as parquets efficiently

2016-07-29 Thread Gourav Sengupta
Hi,

The default write format in SPARK is parquet. And I have never faced any
issues writing over a billion records in SPARK. Are you using
virtualization by any chance or an obsolete hard disk or Intel Celeron may
be?

Regards,
Gourav Sengupta

On Fri, Jul 29, 2016 at 7:27 AM, Sumit Khanna  wrote:

> Hey,
>
> master=yarn
> mode=cluster
>
> spark.executor.memory=8g
> spark.rpc.netty.dispatcher.numThreads=2
>
> All the POC on a single node cluster. the biggest bottle neck being :
>
> 1.8 hrs to save 500k records as a parquet file/dir executing this command :
>
> df.write.format("parquet").mode("overwrite").save(hdfspathTemp)
>
>
> No doubt, the whole execution plan gets triggered on this write / save
> action. But is it the right command / set of params to save a dataframe?
>
> essentially I am doing an upsert by pulling in data from hdfs and then
> updating it with the delta changes of the current run. But not sure if
> write itself takes that much time or some optimization is needed for
> upsert. (I have that asked as another question altogether).
>
> Thanks,
> Sumit
>
>


Re: how to save spark files as parquets efficiently

2016-07-29 Thread Sumit Khanna
Hey,

So I believe this is the right format to save the file, as in optimization
is never in the write part, but with the head / body of my execution plan
isnt it?

Thanks,

On Fri, Jul 29, 2016 at 11:57 AM, Sumit Khanna 
wrote:

> Hey,
>
> master=yarn
> mode=cluster
>
> spark.executor.memory=8g
> spark.rpc.netty.dispatcher.numThreads=2
>
> All the POC on a single node cluster. the biggest bottle neck being :
>
> 1.8 hrs to save 500k records as a parquet file/dir executing this command :
>
> df.write.format("parquet").mode("overwrite").save(hdfspathTemp)
>
>
> No doubt, the whole execution plan gets triggered on this write / save
> action. But is it the right command / set of params to save a dataframe?
>
> essentially I am doing an upsert by pulling in data from hdfs and then
> updating it with the delta changes of the current run. But not sure if
> write itself takes that much time or some optimization is needed for
> upsert. (I have that asked as another question altogether).
>
> Thanks,
> Sumit
>
>


Re: correct / efficient manner to upsert / update in hdfs (via spark / in general)

2016-07-29 Thread Sumit Khanna
Hey Ayan,

A. Create a table TGT1 as (select key,info from delta UNION ALL select
key,info from TGT where key not in (select key from SRC)). Rename TGT1 to
TGT. Not in can be written other variations using Outer Join
B. Assuming SRC and TGT have a timestamp,
  B.1. Select latest records from UNION ALL(SRC,TGT) using RANK()
OVER PARTITION BY (Key order by timestamp desc)
  B.2. Create TGT1 from B.1. Rename TGT1 to TGT2

Well how we approached this was to broadcast the primary keys, since they
say is better because a smaller table (we make sure that our run frequency
is shrunk enlarged based on traffic somehow) so much so that the
cardinality | unique delta primary keys | is a small and broadcastable
number indeed. Then what follows next is a filter function on each executor
which has the keys to be upserted against , all with them(I believe in
memory, broadcast writes the keys in executor memory isnt it ? ). As in,
that was the only optimization I could think of. with option A, as well as
B, there are likely to be huge shuffle costs (shuffleHashJoin)s right?

1.  if updates are fairly spred across keys, the scheme does not give much
benefit as number of partition read ~= total number of partition.
2.  This scheme often shows long tail problem (Think 1 key changed in a
partition).

1. is beyond doubt true, because my any column key back in time/partition
space may get updated in the next run. So is 2, as in we make the entire
partition pass through the filter for only updating 1 or 2-3 affected keys.

I do not think with the current use case if I can ensure that keys get
partitioned well and delta corresponds to just one partition, that will
happen if I only and only maintain the date-wise partitions and some
concept of recency is observed. Let me see how HBase might efficiently
tackle this classic upsert case.

Thanks,
Sumit

On Fri, Jul 29, 2016 at 3:22 PM, ayan guha  wrote:

> This is a classic case compared to hadoop vs DWH implmentation.
>
> Source (Delta table): SRC. Target: TGT
>
> Requirement: Pure Upsert, ie just keep the latest information for each
> key.
>
> Options:
>
> A. Create a table TGT1 as (select key,info from delta UNION ALL select
> key,info from TGT where key not in (select key from SRC)). Rename TGT1 to
> TGT. Not in can be written other variations using Outer Join
> B. Assuming SRC and TGT have a timestamp,
>   B.1. Select latest records from UNION ALL(SRC,TGT) using RANK()
> OVER PARTITION BY (Key order by timestamp desc)
>   B.2. Create TGT1 from B.1. Rename TGT1 to TGT2
>
> Both options are costly. And essentially more effort can be introduced to
> write complex manipulations by partitioning data based on key and read only
> partitions which are "changed". 3 issues:
> 1.  if updates are fairly spred across keys, the scheme does not give much
> benefit as number of partition read ~= total number of partition.
> 2.  This scheme often shows long tail problem (Think 1 key changed in a
> partition).
>
> This may be good when partition is based on keys and keys increase
> monotonically. This adds maintenance of adding more partitions but do well
> well to contain number of partitions read.
>
> My advise: Give HBase a shot. It gives UPSERT out of box. If you want
> history, just add timestamp in the key (in reverse). Computation engines
> easily support HBase.
>
> Best
> Ayan
>
> On Fri, Jul 29, 2016 at 5:03 PM, Sumit Khanna 
> wrote:
>
>> Just a note, I had the delta_df keys for the filter as in NOT
>> INTERSECTION udf broadcasted to all the worker nodes. Which I think is an
>> efficient move enough.
>>
>> Thanks,
>>
>> On Fri, Jul 29, 2016 at 12:19 PM, Sumit Khanna 
>> wrote:
>>
>>> Hey,
>>>
>>> the very first run :
>>>
>>> glossary :
>>>
>>> delta_df := current run / execution changes dataframe.
>>>
>>> def deduplicate :
>>> apply windowing function and group by
>>>
>>> def partitionDataframe(delta_df) :
>>> get unique keys of that data frame and then return an array of data
>>> frames each containing just that very same key as the column.
>>> this will give the above dataframe partitoned as say by date column or
>>> gender column or age group column etc etc.
>>>
>>> 0. deduplicate(delta_df : delta_df [ with all unique primary  /
>>> deduplicating key column ]
>>> 1. partitionDataframe(delta_df) : Array[delta_df(i to # partitons)]
>>> 2. write the dataframe to corresponding parent hdfs path + partiton dir_
>>>
>>> subsequent runs :
>>>
>>> for each partition :
>>> 0. partitionDataframe(delta_df) : Array[delta_df(i to # partitons)]
>>> 1. load df from previous hdfs location of that partition
>>> 2. filter the above df(p) where p is the partiton no. such that keys not
>>> present in delta_df(p) of current run. i.e get df(p)[primary column] not in
>>> delta_df(p). done via a basic ! in UDF.
>>> 3. delta_df.unionAll(filtered df above).
>>> 4. persist the output of 3. as df.write.mode.format.

Re: correct / efficient manner to upsert / update in hdfs (via spark / in general)

2016-07-29 Thread ayan guha
This is a classic case compared to hadoop vs DWH implmentation.

Source (Delta table): SRC. Target: TGT

Requirement: Pure Upsert, ie just keep the latest information for each key.

Options:

A. Create a table TGT1 as (select key,info from delta UNION ALL select
key,info from TGT where key not in (select key from SRC)). Rename TGT1 to
TGT. Not in can be written other variations using Outer Join
B. Assuming SRC and TGT have a timestamp,
  B.1. Select latest records from UNION ALL(SRC,TGT) using RANK()
OVER PARTITION BY (Key order by timestamp desc)
  B.2. Create TGT1 from B.1. Rename TGT1 to TGT2

Both options are costly. And essentially more effort can be introduced to
write complex manipulations by partitioning data based on key and read only
partitions which are "changed". 3 issues:
1.  if updates are fairly spred across keys, the scheme does not give much
benefit as number of partition read ~= total number of partition.
2.  This scheme often shows long tail problem (Think 1 key changed in a
partition).

This may be good when partition is based on keys and keys increase
monotonically. This adds maintenance of adding more partitions but do well
well to contain number of partitions read.

My advise: Give HBase a shot. It gives UPSERT out of box. If you want
history, just add timestamp in the key (in reverse). Computation engines
easily support HBase.

Best
Ayan

On Fri, Jul 29, 2016 at 5:03 PM, Sumit Khanna  wrote:

> Just a note, I had the delta_df keys for the filter as in NOT INTERSECTION
> udf broadcasted to all the worker nodes. Which I think is an efficient move
> enough.
>
> Thanks,
>
> On Fri, Jul 29, 2016 at 12:19 PM, Sumit Khanna 
> wrote:
>
>> Hey,
>>
>> the very first run :
>>
>> glossary :
>>
>> delta_df := current run / execution changes dataframe.
>>
>> def deduplicate :
>> apply windowing function and group by
>>
>> def partitionDataframe(delta_df) :
>> get unique keys of that data frame and then return an array of data
>> frames each containing just that very same key as the column.
>> this will give the above dataframe partitoned as say by date column or
>> gender column or age group column etc etc.
>>
>> 0. deduplicate(delta_df : delta_df [ with all unique primary  /
>> deduplicating key column ]
>> 1. partitionDataframe(delta_df) : Array[delta_df(i to # partitons)]
>> 2. write the dataframe to corresponding parent hdfs path + partiton dir_
>>
>> subsequent runs :
>>
>> for each partition :
>> 0. partitionDataframe(delta_df) : Array[delta_df(i to # partitons)]
>> 1. load df from previous hdfs location of that partition
>> 2. filter the above df(p) where p is the partiton no. such that keys not
>> present in delta_df(p) of current run. i.e get df(p)[primary column] not in
>> delta_df(p). done via a basic ! in UDF.
>> 3. delta_df.unionAll(filtered df above).
>> 4. persist the output of 3. as df.write.mode.format.
>>
>> Is this the right way of doing the upserts partiton wise?  all in all it
>> is taking 2 hours for inserting / upserting 5ooK records in parquet format
>> in some hdfs location where each location gets mapped to one partition.
>>
>> My spark conf specs are :
>>
>> yarn cluster mode. single node.
>> spark.executor.memory 8g
>> spark.rpc.netty.dispatcher.numThreads 2
>>
>> Thanks,
>> Sumit
>>
>>
>>
>


-- 
Best Regards,
Ayan Guha


Re: correct / efficient manner to upsert / update in hdfs (via spark / in general)

2016-07-29 Thread Sumit Khanna
Just a note, I had the delta_df keys for the filter as in NOT INTERSECTION
udf broadcasted to all the worker nodes. Which I think is an efficient move
enough.

Thanks,

On Fri, Jul 29, 2016 at 12:19 PM, Sumit Khanna 
wrote:

> Hey,
>
> the very first run :
>
> glossary :
>
> delta_df := current run / execution changes dataframe.
>
> def deduplicate :
> apply windowing function and group by
>
> def partitionDataframe(delta_df) :
> get unique keys of that data frame and then return an array of data frames
> each containing just that very same key as the column.
> this will give the above dataframe partitoned as say by date column or
> gender column or age group column etc etc.
>
> 0. deduplicate(delta_df : delta_df [ with all unique primary  /
> deduplicating key column ]
> 1. partitionDataframe(delta_df) : Array[delta_df(i to # partitons)]
> 2. write the dataframe to corresponding parent hdfs path + partiton dir_
>
> subsequent runs :
>
> for each partition :
> 0. partitionDataframe(delta_df) : Array[delta_df(i to # partitons)]
> 1. load df from previous hdfs location of that partition
> 2. filter the above df(p) where p is the partiton no. such that keys not
> present in delta_df(p) of current run. i.e get df(p)[primary column] not in
> delta_df(p). done via a basic ! in UDF.
> 3. delta_df.unionAll(filtered df above).
> 4. persist the output of 3. as df.write.mode.format.
>
> Is this the right way of doing the upserts partiton wise?  all in all it
> is taking 2 hours for inserting / upserting 5ooK records in parquet format
> in some hdfs location where each location gets mapped to one partition.
>
> My spark conf specs are :
>
> yarn cluster mode. single node.
> spark.executor.memory 8g
> spark.rpc.netty.dispatcher.numThreads 2
>
> Thanks,
> Sumit
>
>
>


correct / efficient manner to upsert / update in hdfs (via spark / in general)

2016-07-29 Thread Sumit Khanna
Hey,

the very first run :

glossary :

delta_df := current run / execution changes dataframe.

def deduplicate :
apply windowing function and group by

def partitionDataframe(delta_df) :
get unique keys of that data frame and then return an array of data frames
each containing just that very same key as the column.
this will give the above dataframe partitoned as say by date column or
gender column or age group column etc etc.

0. deduplicate(delta_df : delta_df [ with all unique primary  /
deduplicating key column ]
1. partitionDataframe(delta_df) : Array[delta_df(i to # partitons)]
2. write the dataframe to corresponding parent hdfs path + partiton dir_

subsequent runs :

for each partition :
0. partitionDataframe(delta_df) : Array[delta_df(i to # partitons)]
1. load df from previous hdfs location of that partition
2. filter the above df(p) where p is the partiton no. such that keys not
present in delta_df(p) of current run. i.e get df(p)[primary column] not in
delta_df(p). done via a basic ! in UDF.
3. delta_df.unionAll(filtered df above).
4. persist the output of 3. as df.write.mode.format.

Is this the right way of doing the upserts partiton wise?  all in all it is
taking 2 hours for inserting / upserting 5ooK records in parquet format in
some hdfs location where each location gets mapped to one partition.

My spark conf specs are :

yarn cluster mode. single node.
spark.executor.memory 8g
spark.rpc.netty.dispatcher.numThreads 2

Thanks,
Sumit


how to save spark files as parquets efficiently

2016-07-29 Thread Sumit Khanna
Hey,

master=yarn
mode=cluster

spark.executor.memory=8g
spark.rpc.netty.dispatcher.numThreads=2

All the POC on a single node cluster. the biggest bottle neck being :

1.8 hrs to save 500k records as a parquet file/dir executing this command :

df.write.format("parquet").mode("overwrite").save(hdfspathTemp)


No doubt, the whole execution plan gets triggered on this write / save
action. But is it the right command / set of params to save a dataframe?

essentially I am doing an upsert by pulling in data from hdfs and then
updating it with the delta changes of the current run. But not sure if
write itself takes that much time or some optimization is needed for
upsert. (I have that asked as another question altogether).

Thanks,
Sumit


Re: Spark 2.0 -- spark warehouse relative path in absolute URI error

2016-07-29 Thread Tony Lane
I am facing the same issue and completely blocked here.
*Sean can you please help with this issue. *

Migrating to 2.0.0 has really stalled our development effort.

-Tony



> -- Forwarded message --
> From: Sean Owen 
> Date: Fri, Jul 29, 2016 at 12:47 AM
> Subject: Re: Spark 2.0 -- spark warehouse relative path in absolute URI
> error
> To: Rohit Chaddha 
> Cc: "user @spark" 
>
>
> Ah, right. This wasn't actually resolved. Yeah your input on 15899
> would be welcome. See if the proposed fix helps.
>
> On Thu, Jul 28, 2016 at 11:52 AM, Rohit Chaddha
>  wrote:
> > Sean,
> >
> > I saw some JIRA tickets and looks like this is still an open bug (rather
> > than an improvement as marked in JIRA).
> >
> > https://issues.apache.org/jira/browse/SPARK-15893
> > https://issues.apache.org/jira/browse/SPARK-15899
> >
> > I am experimenting, but do you know of any solution on top of your head
> >
> >
> >
> > On Fri, Jul 29, 2016 at 12:06 AM, Rohit Chaddha <
> rohitchaddha1...@gmail.com>
> > wrote:
> >>
> >> I am simply trying to do
> >> session.read().json("file:///C:/data/a.json");
> >>
> >> in 2.0.0-preview it was working fine with
> >> sqlContext.read().json("C:/data/a.json");
> >>
> >>
> >> -Rohit
> >>
> >> On Fri, Jul 29, 2016 at 12:03 AM, Sean Owen  wrote:
> >>>
> >>> Hm, file:///C:/... doesn't work? that should certainly be an absolute
> >>> URI with an absolute path. What exactly is your input value for this
> >>> property?
> >>>
> >>> On Thu, Jul 28, 2016 at 11:28 AM, Rohit Chaddha
> >>>  wrote:
> >>> > Hello Sean,
> >>> >
> >>> > I have tried both  file:/  and file:///
> >>> > Bit it does not work and give the same error
> >>> >
> >>> > -Rohit
> >>> >
> >>> >
> >>> >
> >>> > On Thu, Jul 28, 2016 at 11:51 PM, Sean Owen 
> wrote:
> >>> >>
> >>> >> IIRC that was fixed, in that this is actually an invalid URI. Use
> >>> >> file:/C:/... I think.
> >>> >>
> >>> >> On Thu, Jul 28, 2016 at 10:47 AM, Rohit Chaddha
> >>> >>  wrote:
> >>> >> > I upgraded from 2.0.0-preview to 2.0.0
> >>> >> > and I started getting the following error
> >>> >> >
> >>> >> > Caused by: java.net.URISyntaxException: Relative path in absolute
> >>> >> > URI:
> >>> >> > file:C:/ibm/spark-warehouse
> >>> >> >
> >>> >> > Any ideas how to fix this
> >>> >> >
> >>> >> > -Rohit
> >>> >
> >>> >
> >>
> >>
> >
>
>