Re: Selecting the top 100 records per group by?

2016-09-10 Thread Kevin Burton
Looks like you can do it with dense_rank functions.

https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html

I setup some basic records and seems like it did the right thing.

Now time to throw 50TB and 100 spark nodes at this problem and see what
happens :)

On Sat, Sep 10, 2016 at 7:42 PM, Kevin Burton  wrote:

> Ah.. might actually. I'll have to mess around with that.
>
> On Sat, Sep 10, 2016 at 6:06 PM, Karl Higley  wrote:
>
>> Would `topByKey` help?
>>
>> https://github.com/apache/spark/blob/master/mllib/src/main/
>> scala/org/apache/spark/mllib/rdd/MLPairRDDFunctions.scala#L42
>>
>> Best,
>> Karl
>>
>> On Sat, Sep 10, 2016 at 9:04 PM Kevin Burton  wrote:
>>
>>> I'm trying to figure out a way to group by and return the top 100
>>> records in that group.
>>>
>>> Something like:
>>>
>>> SELECT TOP(100, user_id) FROM posts GROUP BY user_id;
>>>
>>> But I can't really figure out the best way to do this...
>>>
>>> There is a FIRST and LAST aggregate function but this only returns one
>>> column.
>>>
>>> I could do something like:
>>>
>>> SELECT * FROM posts WHERE user_id IN ( /* select top users here */ )
>>> LIMIT 100;
>>>
>>> But that limit is applied for ALL the records. Not each individual user.
>>>
>>>
>>> The only other thing I can think of is to do a manual map reduce and
>>> then have the reducer only return the top 100 each time...
>>>
>>> Would LOVE some advice here...
>>>
>>> --
>>>
>>> We’re hiring if you know of any awesome Java Devops or Linux Operations
>>> Engineers!
>>>
>>> Founder/CEO Spinn3r.com
>>> Location: *San Francisco, CA*
>>> blog: http://burtonator.wordpress.com
>>> … or check out my Google+ profile
>>> 
>>>
>>>
>
>
> --
>
> We’re hiring if you know of any awesome Java Devops or Linux Operations
> Engineers!
>
> Founder/CEO Spinn3r.com
> Location: *San Francisco, CA*
> blog: http://burtonator.wordpress.com
> … or check out my Google+ profile
> 
>
>


-- 

We’re hiring if you know of any awesome Java Devops or Linux Operations
Engineers!

Founder/CEO Spinn3r.com
Location: *San Francisco, CA*
blog: http://burtonator.wordpress.com
… or check out my Google+ profile



Re: Selecting the top 100 records per group by?

2016-09-10 Thread Kevin Burton
Ah.. might actually. I'll have to mess around with that.

On Sat, Sep 10, 2016 at 6:06 PM, Karl Higley  wrote:

> Would `topByKey` help?
>
> https://github.com/apache/spark/blob/master/mllib/src/
> main/scala/org/apache/spark/mllib/rdd/MLPairRDDFunctions.scala#L42
>
> Best,
> Karl
>
> On Sat, Sep 10, 2016 at 9:04 PM Kevin Burton  wrote:
>
>> I'm trying to figure out a way to group by and return the top 100 records
>> in that group.
>>
>> Something like:
>>
>> SELECT TOP(100, user_id) FROM posts GROUP BY user_id;
>>
>> But I can't really figure out the best way to do this...
>>
>> There is a FIRST and LAST aggregate function but this only returns one
>> column.
>>
>> I could do something like:
>>
>> SELECT * FROM posts WHERE user_id IN ( /* select top users here */ )
>> LIMIT 100;
>>
>> But that limit is applied for ALL the records. Not each individual user.
>>
>> The only other thing I can think of is to do a manual map reduce and then
>> have the reducer only return the top 100 each time...
>>
>> Would LOVE some advice here...
>>
>> --
>>
>> We’re hiring if you know of any awesome Java Devops or Linux Operations
>> Engineers!
>>
>> Founder/CEO Spinn3r.com
>> Location: *San Francisco, CA*
>> blog: http://burtonator.wordpress.com
>> … or check out my Google+ profile
>> 
>>
>>


-- 

We’re hiring if you know of any awesome Java Devops or Linux Operations
Engineers!

Founder/CEO Spinn3r.com
Location: *San Francisco, CA*
blog: http://burtonator.wordpress.com
… or check out my Google+ profile



Re: SparkR error: reference is ambiguous.

2016-09-10 Thread Felix Cheung
Could you provide more information on how df in your example is created?
Also please include the output from printSchema(df)?

This example works:
> c <- createDataFrame(cars)
> c
SparkDataFrame[speed:double, dist:double]
> c$speed <- c$dist*0
> c
SparkDataFrame[speed:double, dist:double]
> head(c)
  speed dist
1 0 2
2 0 10
3 0 4
4 0 22
5 0 16
6 0 10


_
From: Bedrytski Aliaksandr >
Sent: Friday, September 9, 2016 9:13 PM
Subject: Re: SparkR error: reference is ambiguous.
To: xingye >
Cc: >


Hi,

Can you use full-string queries in SparkR?
Like (in Scala):

df1.registerTempTable("df1")
df2.registerTempTable("df2")
val df3 = sparkContext.sql("SELECT * FROM df1 JOIN df2 ON df1.ra = df2.ra")

explicitly mentioning table names in the query often solves ambiguity problems.

Regards
--
  Bedrytski Aliaksandr
  sp...@bedryt.ski



On Fri, Sep 9, 2016, at 19:33, xingye wrote:

Not sure whether this is the right distribution list that I can ask questions. 
If not, can someone give a distribution list that can find someone to help?


I kept getting error of reference is ambiguous when implementing some sparkR 
code.


1. when i tried to assign values to a column using the existing column:

df$c_mon<- df$ra*0

  1.  16/09/09 15:11:28 ERROR RBackendHandler: col on 3101 failed
  2.  Error in invokeJava(isStatic = FALSE, objId$id, methodName, ...) :
  3.org.apache.spark.sql.AnalysisException: Reference 'ra' is ambiguous, 
could be: ra#8146, ra#13501.;

2. when I joined two spark dataframes using the key:

df3<-join(df1, df2, df1$ra == df2$ra, "left")

  1.  16/09/09 14:48:07 WARN Column: Constructing trivially true equals 
predicate, 'ra#8146 = ra#8146'. Perhaps you need to use aliases.

Actually column "ra" is the column name, I don't know why sparkR keeps having 
errors about ra#8146 or ra#13501..

Can someone help?

Thanks





RE: Graphhopper/routing in Spark

2016-09-10 Thread Kane O'Donnell
It’s not obvious to me either = ) I was thinking more along the lines of 
retrieving the graph from HDFS/Spark, merging it together (which should be 
taken care of by sc.textFile) and then giving it to GraphHopper. Alternatively 
I guess I could just put the graph locally on every worker node. Or broadcast 
it … I must be able to just broadcast a chunk of byte data? (On disk, the 
contracted graph is only 30mb.)

I hadn’t considered GraphX. It doesn’t look suitable as it’s likely to be 
considerably slower, and not do all of the nice stuff GraphHopper does (e.g. 
vehicle specific stuff, including importing and processing OSM data).

Kane


Kane O'Donnell
Data Scientist
Datamine Limited - backing the Innovation Council in recognising brilliance in 
business


DDI:+64 9 303 2300
Mob:   +64 27 306 3964
0800 DATAMINE:   0800 328 264


Visit us at:


Shed One, 15 Faraday St, Parnell, Auckland 1052, New Zealand


Pop it in the post:


PO Box 37120, Parnell, Auckland 1151, New Zealand


Need more help finding us... ?: Click here!


www.datamine.com


Disclaimer: This email and any files transmitted with it are confidential and 
may contain legally privileged material, intended solely for the use of the 
individual or entity to whom they are addressed. If you have received this 
e-mail message in error, please contact the sender and delete the material from 
any computer. Any use, review, dissemination, distribution or copying of this 
document by persons other than the intended recipient is strictly prohibited. 
Thank you.

From: Robin East [mailto:robin.e...@xense.co.uk]
Sent: Friday, 9 September 2016 7:08 p.m.
To: Kane O'Donnell
Cc: user@spark.apache.org
Subject: Re: Graphhopper/routing in Spark

It’s not obvious to me how that would work. In principle I imagine you could 
have your source data loaded into HDFS and read by GraphHopper instances 
running on Spark workers. But a graph by it’s nature has items that have 
connections to potentially any other item so GraphHopper instances would need 
to have a way of dealing with that and I presume GraphHopper is not designed 
that way. Spark’s Graph processing library, GraphX, was designed that way and 
plenty of thought has gone into how to distribute a graph across machines and 
still have a way of running algorithms.
---
Robin East
Spark GraphX in Action Michael Malak and Robin East
Manning Publications Co.
http://www.manning.com/books/spark-graphx-in-action




On 8 Sep 2016, at 22:45, kodonnell 
> wrote:

Just wondering if anyone has experience at running Graphhopper (or similar)
in Spark?

In short, I can get it running in the master, but not in worker nodes. The
key trouble seems to be that Graphhopper depends on a pre-processed graph,
which it obtains from OSM data. In normal (desktop) use, it pre-processes,
and then caches to disk. My current thinking is that I could create the
cache locally, and then put it in HDFS, and tweak Graphhopper to read from
the HDFS source. Alternatively I could try to broadcast the cache (or the
entire Graphhopper instance) - though I believe that would require both
being serializable (which I've got little clue about). Does anyone have any
recommendations on the above?

In addition, I'm not quite sure how to structure it to minimise the cache
reading - I don't want to have to read the cache (and initialise
Graphhopper) for e.g. every route, as that's likely to be slow. It'd be nice
if this was only done once (e.g. for each partition) and then all the routes
in the partition processed with the same Graphhopper instance. Again, any
thoughts on this?

FYI, discussion on Graphhoper forum is  here
  ,
though no luck there.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Graphhopper-routing-in-Spark-tp27682.html
Sent from the Apache Spark User List mailing list archive at 
Nabble.com.

-
To unsubscribe e-mail: 
user-unsubscr...@spark.apache.org



Re: questions about using dapply

2016-09-10 Thread Felix Cheung
You might need MARGIN capitalized, this example works though:

c <- as.DataFrame(cars)
# rename the columns to c1, c2
c <- selectExpr(c, "speed as c1", "dist as c2")
cols_in <- dapplyCollect(c,
function(x) {apply(x[, paste("c", 1:2, sep = "")], MARGIN=2, FUN = function(y){ 
y %in% c(61, 99)})})
# dapplyCollect does not require the schema parameter


_
From: xingye >
Sent: Friday, September 9, 2016 10:35 AM
Subject: questions about using dapply
To: >



I have a question about using UDF in SparkR. I'm converting some R code into 
SparkR.


* The original R code is :

cols_in <- apply(df[, paste("cr_cd", 1:12, sep = "")], MARGIN = 2, FUN = 
"%in%", c(61, 99))


* If I use dapply and put the original apply function as a function for dapply,

cols_in <-dapply(df,

function(x) {apply(x[, paste("cr_cd", 1:12, sep = "")], Margin=2, function(y){ 
y %in% c(61, 99)})},

schema )

The error shows Error in match.fun(FUN) : argument "FUN" is missing, with no 
default


* If I use spark.lapply, it still shows the error. It seems in spark, the 
column cr_cd1 is ambiguous.

cols_in <-spark.lapply(df[, paste("cr_cd", 1:12, sep = "")], function(x){ x 
%in% c(61, 99)})

 16/09/08 ERROR RBackendHandler: select on 3101 failed Error in 
invokeJava(isStatic = FALSE, objId$id, methodName, ...) : 
org.apache.spark.sql.AnalysisException: Reference 'cr_cd1' is ambiguous, could 
be: cr_cd1#2169L, cr_cd1#17787L.;



  *   If I use dapplycollect, it works but it will lead to memory issue if data 
is big. how can the dapply work in my case?

wrapper = function(df){

out = apply(df[, paste("cr_cd", 1:12, sep = "")], MARGIN = 2, FUN = "%in%", 
c(61, 99))

return(out)

}

cols_in <-dapplyCollect(df,wrapper)




Re: SparkR API problem with subsetting distributed data frame

2016-09-10 Thread Felix Cheung
How are you calling dirs()? What would be x? Is dat a SparkDataFrame?

With SparkR, i in dat[i, 4] should be an logical expression for row, eg. 
df[df$age %in% c(19, 30), 1:2]





On Sat, Sep 10, 2016 at 11:02 AM -0700, "Bene" 
> wrote:

Here are a few code snippets:

The data frame looks like this:

kfzzeit   datum
latitude longitude
1 # 2015-02-09 07:18:33 2015-02-09 52.35234  9.881965
2 # 2015-02-09 07:18:34 2015-02-09 52.35233  9.881970
3 # 2015-02-09 07:18:35 2015-02-09 52.35232  9.881975
4 # 2015-02-09 07:18:36 2015-02-09 52.35232  9.881972
5 # 2015-02-09 07:18:37 2015-02-09 52.35231  9.881973
6 # 2015-02-09 07:18:38 2015-02-09 52.35231  9.881978

I call this function with a number (position in the data frame) and a data
frame:

dirs <- function(x, dat){
  direction(startLat = dat[x,4], endLat = dat[x+1,4], startLon = dat[x,5],
endLon = dat[x+1,5])
}

Here I get the error with the S4 class not subsettable. This function calls
another function which does the actual calculation:

direction <- function(startLat, endLat, startLon, endLon){
  startLat <- degrees.to.radians(startLat);
  startLon <- degrees.to.radians(startLon);
  endLat <- degrees.to.radians(endLat);
  endLon <- degrees.to.radians(endLon);
  dLon <- endLon - startLon;

  dPhi <- log(tan(endLat / 2 + pi / 4) / tan(startLat / 2 + pi / 4));
  if (abs(dLon) > pi) {
if (dLon > 0) {
  dLon <- -(2 * pi - dLon);
} else {
  dLon <- (2 * pi + dLon);
}
  }
  bearing <- radians.to.degrees((atan2(dLon, dPhi) + 360 )) %% 360;
  return (bearing);
}


Anything more you need?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/SparkR-API-problem-with-subsetting-distributed-data-frame-tp27688p27691.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Assign values to existing column in SparkR

2016-09-10 Thread Felix Cheung
If you are to set a column to 0 (essentially remove and replace the existing 
one) you would need to put a column on the right hand side:


> df <- as.DataFrame(iris)
> head(df)
  Sepal_Length Sepal_Width Petal_Length Petal_Width Species
1 5.1 3.5 1.4 0.2 setosa
2 4.9 3.0 1.4 0.2 setosa
3 4.7 3.2 1.3 0.2 setosa
4 4.6 3.1 1.5 0.2 setosa
5 5.0 3.6 1.4 0.2 setosa
6 5.4 3.9 1.7 0.4 setosa
> df$Petal_Length <- 0
Error: class(value) == "Column" || is.null(value) is not TRUE
> df$Petal_Length <- lit(0)
> head(df)
  Sepal_Length Sepal_Width Petal_Length Petal_Width Species
1 5.1 3.5 0 0.2 setosa
2 4.9 3.0 0 0.2 setosa
3 4.7 3.2 0 0.2 setosa
4 4.6 3.1 0 0.2 setosa
5 5.0 3.6 0 0.2 setosa
6 5.4 3.9 0 0.4 setosa

_
From: Deepak Sharma >
Sent: Friday, September 9, 2016 12:29 PM
Subject: Re: Assign values to existing column in SparkR
To: xingye >
Cc: >


Data frames are immutable in nature , so i don't think you can directly assign 
or change values on the column.

Thanks
Deepak

On Fri, Sep 9, 2016 at 10:59 PM, xingye 
> wrote:

I have some questions about assign values to a spark dataframe. I want to 
assign values to an existing column of a spark dataframe but if I assign the 
value directly, I got the following error.

  1.  df$c_mon<-0
  2.  Error: class(value) == "Column" || is.null(value) is not TRUE

Is there a way to solve this?



--
Thanks
Deepak
www.bigdatabig.com
www.keosha.net




Re: Selecting the top 100 records per group by?

2016-09-10 Thread Karl Higley
Would `topByKey` help?

https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/rdd/MLPairRDDFunctions.scala#L42

Best,
Karl

On Sat, Sep 10, 2016 at 9:04 PM Kevin Burton  wrote:

> I'm trying to figure out a way to group by and return the top 100 records
> in that group.
>
> Something like:
>
> SELECT TOP(100, user_id) FROM posts GROUP BY user_id;
>
> But I can't really figure out the best way to do this...
>
> There is a FIRST and LAST aggregate function but this only returns one
> column.
>
> I could do something like:
>
> SELECT * FROM posts WHERE user_id IN ( /* select top users here */ ) LIMIT
> 100;
>
> But that limit is applied for ALL the records. Not each individual user.
>
> The only other thing I can think of is to do a manual map reduce and then
> have the reducer only return the top 100 each time...
>
> Would LOVE some advice here...
>
> --
>
> We’re hiring if you know of any awesome Java Devops or Linux Operations
> Engineers!
>
> Founder/CEO Spinn3r.com
> Location: *San Francisco, CA*
> blog: http://burtonator.wordpress.com
> … or check out my Google+ profile
> 
>
>


Selecting the top 100 records per group by?

2016-09-10 Thread Kevin Burton
I'm trying to figure out a way to group by and return the top 100 records
in that group.

Something like:

SELECT TOP(100, user_id) FROM posts GROUP BY user_id;

But I can't really figure out the best way to do this...

There is a FIRST and LAST aggregate function but this only returns one
column.

I could do something like:

SELECT * FROM posts WHERE user_id IN ( /* select top users here */ ) LIMIT
100;

But that limit is applied for ALL the records. Not each individual user.

The only other thing I can think of is to do a manual map reduce and then
have the reducer only return the top 100 each time...

Would LOVE some advice here...

-- 

We’re hiring if you know of any awesome Java Devops or Linux Operations
Engineers!

Founder/CEO Spinn3r.com
Location: *San Francisco, CA*
blog: http://burtonator.wordpress.com
… or check out my Google+ profile



Re: Spark_JDBC_Partitions

2016-09-10 Thread Mich Talebzadeh
Good points

Unfortunately databump. expr, imp use binary format for import and export.
that cannot be used to import data into HDFS in a suitable way.

One can use what is known as flat,sh script to get data out tab or ,
separated etc.

ROWNUM is a pseudocolumn (not a real column) that is available in a query.
The issue is that in a table of 280Million rows to get the position of the
row it will have to do a table scan since no index cannot be built on it
(assuming there is no other suitable index). Not ideal but can be done.

I think a better alternative is to use datapump to take that table to
DEV/TEST, add a sequence (like an IDENTITY column in Sybase), build a
unique index on the sequence column and do the partitioning there.

HTH





Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 10 September 2016 at 22:37, ayan guha  wrote:

> In oracle something called row num is present in every row.  You can
> create an evenly distribution using that column. If it is one time work,
> try using sqoop. Are you using Oracle's own appliance? Then you can use
> data pump format
> On 11 Sep 2016 01:59, "Mich Talebzadeh"  wrote:
>
>> creating an Oracle sequence for a table of 200million is not going to be
>> that easy without changing the schema. It is possible to export that table
>> from prod and import it to DEV/TEST and create the sequence there.
>>
>> If it is a FACT table then the foreign keys from the Dimension tables
>> will be bitmap indexes on the FACT table so they can be potentially used.
>>
>> HTH
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>> On 10 September 2016 at 16:42, Takeshi Yamamuro 
>> wrote:
>>
>>> Hi,
>>>
>>> Yea, spark does not have the same functionality with sqoop.
>>> I think one of simple solutions is to assign unique ids on the oracle
>>> table by yourself.
>>> Thought?
>>>
>>> // maropu
>>>
>>>
>>> On Sun, Sep 11, 2016 at 12:37 AM, Mich Talebzadeh <
>>> mich.talebza...@gmail.com> wrote:
>>>
 Strange that Oracle table of 200Million plus rows has not been
 partitioned.

 What matters here is to have parallel connections from JDBC to Oracle,
 each reading a sub-set of table. Any parallel fetch is going to be better
 than reading with one connection from Oracle.

 Surely among 404 columns there must be one with high cardinality to
 satisfy this work.

 May be you should just create table  as select * from
 Oracle_table where rownum <= 100; and use that for test.

 Other alternative is to use Oracle SQL Connecter for HDFS
 that
 can do it for you. With 404 columns it is difficult to suggest any
 alternative. Is this a FACT table?

 HTH



 Dr Mich Talebzadeh



 LinkedIn * 
 https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
 *



 http://talebzadehmich.wordpress.com


 *Disclaimer:* Use it at your own risk. Any and all responsibility for
 any loss, damage or destruction of data or any other property which may
 arise from relying on this email's technical content is explicitly
 disclaimed. The author will in no case be liable for any monetary damages
 arising from such loss, damage or destruction.



 On 10 September 2016 at 16:20, Ajay Chander 
 wrote:

> Hello Everyone,
>
> My goal is to use Spark Sql to load huge amount of data from Oracle to
> HDFS.
>
> *Table in Oracle:*
> 1) no primary key.
> 2) Has 404 columns.
> 3) Has 200,800,000 rows.
>
> *Spark SQL:*
> In my Spark SQL I want to read the 

Re: Why is Spark getting Kafka data out from port 2181 ?

2016-09-10 Thread Cody Koeninger
Are you using the receiver based stream?

On Sep 10, 2016 15:45, "Eric Ho"  wrote:

> I notice that some Spark programs would contact something like 'zoo1:2181'
> when trying to suck data out of Kafka.
>
> Does the kafka data actually transported out over this port ?
>
> Typically Zookeepers use 2218 for SSL.
> If my Spark program were to use 2218, how would I specify zookeeper
> specific truststore in my Spark config ?  Do I just give -D flags via
> JAVA_OPTS ?
>
> Thx
>
> --
>
> -eric ho
>
>


Re: Is Spark 2.0 master node compatible with Spark 1.5 work node?

2016-09-10 Thread Holden Karau
I don't think a 2.0 uber jar will play nicely on a 1.5 standalone cluster.

On Saturday, September 10, 2016, Felix Cheung 
wrote:

> You should be able to get it to work with 2.0 as uber jar.
>
> What type cluster you are running on? YARN? And what distribution?
>
>
>
>
>
> On Sun, Sep 4, 2016 at 8:48 PM -0700, "Holden Karau"  > wrote:
>
> You really shouldn't mix different versions of Spark between the master
> and worker nodes, if your going to upgrade - upgrade all of them. Otherwise
> you may get very confusing failures.
>
> On Monday, September 5, 2016, Rex X  > wrote:
>
>> Wish to use the Pivot Table feature of data frame which is available
>> since Spark 1.6. But the spark of current cluster is version 1.5. Can we
>> install Spark 2.0 on the master node to work around this?
>>
>> Thanks!
>>
>
>
> --
> Cell : 425-233-8271
> Twitter: https://twitter.com/holdenkarau
>
>

-- 
Cell : 425-233-8271
Twitter: https://twitter.com/holdenkarau


Re: Spark_JDBC_Partitions

2016-09-10 Thread ayan guha
In oracle something called row num is present in every row.  You can create
an evenly distribution using that column. If it is one time work, try using
sqoop. Are you using Oracle's own appliance? Then you can use data pump
format
On 11 Sep 2016 01:59, "Mich Talebzadeh"  wrote:

> creating an Oracle sequence for a table of 200million is not going to be
> that easy without changing the schema. It is possible to export that table
> from prod and import it to DEV/TEST and create the sequence there.
>
> If it is a FACT table then the foreign keys from the Dimension tables will
> be bitmap indexes on the FACT table so they can be potentially used.
>
> HTH
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 10 September 2016 at 16:42, Takeshi Yamamuro 
> wrote:
>
>> Hi,
>>
>> Yea, spark does not have the same functionality with sqoop.
>> I think one of simple solutions is to assign unique ids on the oracle
>> table by yourself.
>> Thought?
>>
>> // maropu
>>
>>
>> On Sun, Sep 11, 2016 at 12:37 AM, Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> Strange that Oracle table of 200Million plus rows has not been
>>> partitioned.
>>>
>>> What matters here is to have parallel connections from JDBC to Oracle,
>>> each reading a sub-set of table. Any parallel fetch is going to be better
>>> than reading with one connection from Oracle.
>>>
>>> Surely among 404 columns there must be one with high cardinality to
>>> satisfy this work.
>>>
>>> May be you should just create table  as select * from
>>> Oracle_table where rownum <= 100; and use that for test.
>>>
>>> Other alternative is to use Oracle SQL Connecter for HDFS
>>> that
>>> can do it for you. With 404 columns it is difficult to suggest any
>>> alternative. Is this a FACT table?
>>>
>>> HTH
>>>
>>>
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * 
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> *
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>> On 10 September 2016 at 16:20, Ajay Chander  wrote:
>>>
 Hello Everyone,

 My goal is to use Spark Sql to load huge amount of data from Oracle to
 HDFS.

 *Table in Oracle:*
 1) no primary key.
 2) Has 404 columns.
 3) Has 200,800,000 rows.

 *Spark SQL:*
 In my Spark SQL I want to read the data into n number of partitions in
 parallel, for which I need to provide 'partition column','lowerBound',
 'upperbound', 'numPartitions' from the table Oracle. My table in Oracle has
 no such column to satisfy this need(Highly Skewed), because of it, if the
 numPartitions is set to 104, 102 tasks are finished in a minute, 1 task
 finishes in 20 mins and the last one takes forever.

 Is there anything I could do to distribute the data evenly into
 partitions? Can we set any fake query to orchestrate this pull process, as
 we do in SQOOP like this '--boundary-query "SELECT CAST(0 AS NUMBER) AS
 MIN_MOD_VAL, CAST(12 AS NUMBER) AS MAX_MOD_VAL FROM DUAL"' ?

 Any pointers are appreciated.

 Thanks for your time.

 ~ Ajay

>>>
>>>
>>
>>
>> --
>> ---
>> Takeshi Yamamuro
>>
>
>


Re: Problems with Reading CSV Files - Java - Eclipse

2016-09-10 Thread ayan guha
It is failing with no class found error for parquet output committer. Maybe
a build issue?
On 11 Sep 2016 01:50, "Irfan Kabli"  wrote:

> Dear Spark community members,
>
> I am trying to read a CSV file in Spark using Java API.
>
> My setup is as follows:
> > Windows Machine
> > Local deployment
> > Spark 2.0.0
> > Eclipse Scala IDE 4.0.0
>
> I am trying to read from the local file system with the following code:
>
> (Using the Java Perspective)
>
>  SparkSession mySparkSession = SparkSession.builder()
> .master("local")
> .appName("loadingFiles")
> .getOrCreate();
>
> Dataset myDataSet= mySparkSession.read().csv("C:/
> temp/pricepaid/pp-monthly-update-new-version.csv");
>
> I am getting the following error message when running the application via
> Eclipse:
>
> xception in thread "main" java.lang.IllegalArgumentException: Error while
> instantiating 'org.apache.spark.sql.internal.SessionState':
> at org.apache.spark.sql.SparkSession$.org$apache$spark$sql$SparkSession$$
> reflect(SparkSession.scala:949)
> at org.apache.spark.sql.SparkSession.sessionState$
> lzycompute(SparkSession.scala:111)
> at org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:110)
> at org.apache.spark.sql.SparkSession.conf$lzycompute(
> SparkSession.scala:133)
> at org.apache.spark.sql.SparkSession.conf(SparkSession.scala:133)
> at org.apache.spark.sql.SparkSession$Builder$$anonfun$getOrCreate$5.apply(
> SparkSession.scala:838)
> at org.apache.spark.sql.SparkSession$Builder$$anonfun$getOrCreate$5.apply(
> SparkSession.scala:838)
> at scala.collection.mutable.HashMap$$anonfun$foreach$1.
> apply(HashMap.scala:99)
> at scala.collection.mutable.HashMap$$anonfun$foreach$1.
> apply(HashMap.scala:99)
> at scala.collection.mutable.HashTable$class.foreachEntry(
> HashTable.scala:230)
> at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
> at scala.collection.mutable.HashMap.foreach(HashMap.scala:99)
> at org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.
> scala:838)
> at org.packtpub.SparkFunctionsTest.main(SparkFunctionsTest.java:110)
> Caused by: java.lang.reflect.InvocationTargetException
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> at sun.reflect.NativeConstructorAccessorImpl.newInstance(
> NativeConstructorAccessorImpl.java:62)
> at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(
> DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:408)
> at org.apache.spark.sql.SparkSession$.org$apache$spark$sql$SparkSession$$
> reflect(SparkSession.scala:946)
> ... 13 more
> Caused by: java.lang.IllegalArgumentException: Error while instantiating
> 'org.apache.spark.sql.internal.SharedState':
> at org.apache.spark.sql.SparkSession$.org$apache$spark$sql$SparkSession$$
> reflect(SparkSession.scala:949)
> at org.apache.spark.sql.SparkSession$$anonfun$sharedState$1.apply(
> SparkSession.scala:100)
> at org.apache.spark.sql.SparkSession$$anonfun$sharedState$1.apply(
> SparkSession.scala:100)
> at scala.Option.getOrElse(Option.scala:121)
> at org.apache.spark.sql.SparkSession.sharedState$
> lzycompute(SparkSession.scala:99)
> at org.apache.spark.sql.SparkSession.sharedState(SparkSession.scala:98)
> at org.apache.spark.sql.internal.SessionState.(
> SessionState.scala:153)
> ... 18 more
> Caused by: java.lang.reflect.InvocationTargetException
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> at sun.reflect.NativeConstructorAccessorImpl.newInstance(
> NativeConstructorAccessorImpl.java:62)
> at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(
> DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:408)
> at org.apache.spark.sql.SparkSession$.org$apache$spark$sql$SparkSession$$
> reflect(SparkSession.scala:946)
> ... 24 more
> Caused by: java.lang.NoClassDefFoundError: org/apache/parquet/hadoop/
> ParquetOutputCommitter
> at org.apache.spark.sql.internal.SQLConf$.(SQLConf.scala:235)
> at org.apache.spark.sql.internal.SQLConf$.(SQLConf.scala)
> at org.apache.spark.sql.internal.SQLConf.setConfString(SQLConf.scala:711)
> at org.apache.spark.sql.internal.SharedState$$anonfun$1.apply(
> SharedState.scala:67)
> at org.apache.spark.sql.internal.SharedState$$anonfun$1.apply(
> SharedState.scala:67)
> at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.
> scala:33)
> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
> at org.apache.spark.sql.internal.SharedState.(SharedState.scala:67)
> ... 29 more
> Caused by: java.lang.ClassNotFoundException: org.apache.parquet.hadoop.
> ParquetOutputCommitter
> at java.net.URLClassLoader$1.run(URLClassLoader.java:372)
> at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
> at java.security.AccessController.doPrivileged(Native Method)
> at java.net.URLClassLoader.findClass(URLClassLoader.java:360)
> at 

Why is Spark getting Kafka data out from port 2181 ?

2016-09-10 Thread Eric Ho
I notice that some Spark programs would contact something like 'zoo1:2181'
when trying to suck data out of Kafka.

Does the kafka data actually transported out over this port ?

Typically Zookeepers use 2218 for SSL.
If my Spark program were to use 2218, how would I specify zookeeper
specific truststore in my Spark config ?  Do I just give -D flags via
JAVA_OPTS ?

Thx

-- 

-eric ho


Not sure why Filter on DStream doesn't get invoked?

2016-09-10 Thread kant kodali
Hi All,

I am trying to simplify how to frame my question so below is my code. I see
that BAR gets printed but not FOO and I am not sure why? my batch interval
is 1 second (something I pass in when I create a spark context). any idea?
I have bunch of events and I want to store the number of events where the
status == "Pending" every second (no prior state needed).

jsonMessagesDStream
.filter(new Function() {
@Override
public Boolean call(String v1) throws Exception {
System.out.println("FOO**");
JsonParser parser = new JsonParser();
JsonObject jsonObj = parser.parse(v1).getAsJsonObject();
if (jsonObj != null && jsonObj.has("status")) {
return
jsonObj.get("status").getAsString().equalsIgnoreCase("Pending");
}
return false;
}
}).foreachRDD(new VoidFunction() {
@Override
public void call(JavaRDD stringJavaRDD) throws Exception {
System.out.println("*BAR**");
store(stringJavaRDD.count());
}
});


Re: SparkR API problem with subsetting distributed data frame

2016-09-10 Thread Bene
Here are a few code snippets:

The data frame looks like this:

kfzzeit   datum 
latitude longitude
1 # 2015-02-09 07:18:33 2015-02-09 52.35234  9.881965
2 # 2015-02-09 07:18:34 2015-02-09 52.35233  9.881970
3 # 2015-02-09 07:18:35 2015-02-09 52.35232  9.881975
4 # 2015-02-09 07:18:36 2015-02-09 52.35232  9.881972
5 # 2015-02-09 07:18:37 2015-02-09 52.35231  9.881973
6 # 2015-02-09 07:18:38 2015-02-09 52.35231  9.881978

I call this function with a number (position in the data frame) and a data
frame:

dirs <- function(x, dat){
  direction(startLat = dat[x,4], endLat = dat[x+1,4], startLon = dat[x,5],
endLon = dat[x+1,5])
}

Here I get the error with the S4 class not subsettable. This function calls
another function which does the actual calculation:

direction <- function(startLat, endLat, startLon, endLon){
  startLat <- degrees.to.radians(startLat);
  startLon <- degrees.to.radians(startLon);
  endLat <- degrees.to.radians(endLat);
  endLon <- degrees.to.radians(endLon);
  dLon <- endLon - startLon;

  dPhi <- log(tan(endLat / 2 + pi / 4) / tan(startLat / 2 + pi / 4));
  if (abs(dLon) > pi) {
if (dLon > 0) {
  dLon <- -(2 * pi - dLon);
} else {
  dLon <- (2 * pi + dLon);
}
  }
  bearing <- radians.to.degrees((atan2(dLon, dPhi) + 360 )) %% 360;
  return (bearing);
}


Anything more you need?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/SparkR-API-problem-with-subsetting-distributed-data-frame-tp27688p27691.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: SparkR API problem with subsetting distributed data frame

2016-09-10 Thread Felix Cheung
Could you include code snippets you are running?





On Sat, Sep 10, 2016 at 1:44 AM -0700, "Bene" 
> wrote:

Hi,

I am having a problem with the SparkR API. I need to subset a distributed
data so I can extract single values from it on which I can then do
calculations.

Each row of my df has two integer values, I am creating a vector of new
values calculated as a series of sin, cos, tan functions on these two
values. Does anyone have an idea how to do this in SparkR?

So far I tried subsetting with [], [[]], subset(), but mostly I get the
error

object of type 'S4' is not subsettable

Is there any way to do such a thing in SparkR? Any help would be greatly
appreciated! Also let me know if you need more information, code etc.
Thanks!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/SparkR-API-problem-with-subsetting-distributed-data-frame-tp27688.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: iterating over DataFrame Partitions sequentially

2016-09-10 Thread sujeet jog
Thank you Jacob,
It works for me.

On Sat, Sep 10, 2016 at 12:54 AM, Jakob Odersky  wrote:

> > Hi Jakob, I have a DataFrame with like 10 patitions, based on the exact
> content on each partition i want to batch load some other data from DB, i
> cannot operate in parallel due to resource contraints i have,  hence want
> to sequential iterate over each partition and perform operations.
>
>
> Ah I see. I think in that case your best option is to run several
> jobs, selecting different subsets of your dataframe for each job and
> running them one after the other. One way to do that would be to get
> the underlying rdd, mapping with the partition's index and then
> filtering and itering over every element. Eg.:
>
> val withPartitionIndex = df.rdd.mapPartitionWithIndex((idx, it) =>
> it.map(elem => (idx, elem))
>
> for (i <- 0 until n) {
>   withPartitionIndex.filter{case (idx, _) => idx == i}.foreach{ case
> (idx, elem) =>
> //do something with elem
>   }
> }
>
> it's not the best use-case of Spark though and will probably be a
> performance bottleneck.
>
> On Fri, Sep 9, 2016 at 11:45 AM, Jakob Odersky  wrote:
> > Hi Sujeet,
> >
> > going sequentially over all parallel, distributed data seems like a
> > counter-productive thing to do. What are you trying to accomplish?
> >
> > regards,
> > --Jakob
> >
> > On Fri, Sep 9, 2016 at 3:29 AM, sujeet jog  wrote:
> >> Hi,
> >> Is there a way to iterate over a DataFrame with n partitions
> sequentially,
> >>
> >>
> >> Thanks,
> >> Sujeet
> >>
>


Re: Is Spark 2.0 master node compatible with Spark 1.5 work node?

2016-09-10 Thread Felix Cheung
You should be able to get it to work with 2.0 as uber jar.

What type cluster you are running on? YARN? And what distribution?





On Sun, Sep 4, 2016 at 8:48 PM -0700, "Holden Karau" 
> wrote:

You really shouldn't mix different versions of Spark between the master and 
worker nodes, if your going to upgrade - upgrade all of them. Otherwise you may 
get very confusing failures.

On Monday, September 5, 2016, Rex X 
> wrote:
Wish to use the Pivot Table feature of data frame which is available since 
Spark 1.6. But the spark of current cluster is version 1.5. Can we install 
Spark 2.0 on the master node to work around this?

Thanks!


--
Cell : 425-233-8271
Twitter: https://twitter.com/holdenkarau



Re: Spark CSV skip lines

2016-09-10 Thread Hyukjin Kwon
As you are reading each record as each file via wholeTextFiles and
falttening them to records, I think you can just drop the few lines as you
want.

Can you just drop or skip few lines from reader.readAll().map(...)?

Also, are you sure this is an issue in Spark or external CSV library issue?

Do you mind if I ask the stack trace if you think so?

On 11 Sep 2016 1:50 a.m., "Selvam Raman"  wrote:

> Hi,
>
> I saw this two option already anyway thanks for the idea.
>
> i am using wholetext file to read my data(cause there are  \n middle of
> it) and using opencsv to parse the data. In my data first two lines are
> just some report. how can i eliminate.
>
> *How to eliminate first two lines after reading from wholetextfiles.*
>
> val test = wholeTextFiles.flatMap{ case (_, txt) =>
>  | val reader = new CSVReader(new StringReader(txt));
>  | reader.readAll().map(data => Row(data(3),data(4),data(7),
> data(9),data(14)))}
>
> The above code throws arrayoutofbounce exception for empty line and report
> line.
>
>
> On Sat, Sep 10, 2016 at 3:02 PM, Hyukjin Kwon  wrote:
>
>> Hi Selvam,
>>
>> If your report is commented with any character (e.g. #), you can skip
>> these lines via comment option [1].
>>
>> If you are using Spark 1.x, then you might be able to do this by manually
>> skipping from the RDD and then making this to DataFrame as below:
>>
>> I haven’t tested this but I think this should work.
>>
>> val rdd = sparkContext.textFile("...")
>> val filteredRdd = rdd.mapPartitionsWithIndex { (idx, iter) =>
>>   if (idx == 0) {
>> iter.drop(10)
>>   } else {
>> iter
>>   }
>> }
>> val df = new CsvParser().csvRdd(sqlContext, filteredRdd)
>>
>> If you are using Spark 2.0, then it seems there is no way to manually
>> modifying the source data because loading existing RDD or DataSet[String]
>> to DataFrame is not yet supported.
>>
>> There is an issue open[2]. I hope this is helpful.
>>
>> Thanks.
>>
>> [1] https://github.com/apache/spark/blob/27209252f09ff73c58e
>> 60c6df8aaba73b308088c/sql/core/src/main/scala/org/
>> apache/spark/sql/DataFrameReader.scala#L369
>> [2] https://issues.apache.org/jira/browse/SPARK-15463
>>
>>
>> ​
>>
>>
>> On 10 Sep 2016 6:14 p.m., "Selvam Raman"  wrote:
>>
>>> Hi,
>>>
>>> I am using spark csv to read csv file. The issue is my files first n
>>> lines contains some report and followed by actual data (header and rest of
>>> the data).
>>>
>>> So how can i skip first n lines in spark csv. I dont have any specific
>>> comment character in the first byte.
>>>
>>> Please give me some idea.
>>>
>>> --
>>> Selvam Raman
>>> "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"
>>>
>>
>
>
> --
> Selvam Raman
> "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"
>


Re: Spark CSV skip lines

2016-09-10 Thread Selvam Raman
Hi,

I saw this two option already anyway thanks for the idea.

i am using wholetext file to read my data(cause there are  \n middle of it)
and using opencsv to parse the data. In my data first two lines are just
some report. how can i eliminate.

*How to eliminate first two lines after reading from wholetextfiles.*

val test = wholeTextFiles.flatMap{ case (_, txt) =>
 | val reader = new CSVReader(new StringReader(txt));
 | reader.readAll().map(data =>
Row(data(3),data(4),data(7),data(9),data(14)))}

The above code throws arrayoutofbounce exception for empty line and report
line.


On Sat, Sep 10, 2016 at 3:02 PM, Hyukjin Kwon  wrote:

> Hi Selvam,
>
> If your report is commented with any character (e.g. #), you can skip
> these lines via comment option [1].
>
> If you are using Spark 1.x, then you might be able to do this by manually
> skipping from the RDD and then making this to DataFrame as below:
>
> I haven’t tested this but I think this should work.
>
> val rdd = sparkContext.textFile("...")
> val filteredRdd = rdd.mapPartitionsWithIndex { (idx, iter) =>
>   if (idx == 0) {
> iter.drop(10)
>   } else {
> iter
>   }
> }
> val df = new CsvParser().csvRdd(sqlContext, filteredRdd)
>
> If you are using Spark 2.0, then it seems there is no way to manually
> modifying the source data because loading existing RDD or DataSet[String]
> to DataFrame is not yet supported.
>
> There is an issue open[2]. I hope this is helpful.
>
> Thanks.
>
> [1] https://github.com/apache/spark/blob/27209252f09ff73c58e60c6df8aaba
> 73b308088c/sql/core/src/main/scala/org/apache/spark/sql/
> DataFrameReader.scala#L369
> [2] https://issues.apache.org/jira/browse/SPARK-15463
>
>
> ​
>
>
> On 10 Sep 2016 6:14 p.m., "Selvam Raman"  wrote:
>
>> Hi,
>>
>> I am using spark csv to read csv file. The issue is my files first n
>> lines contains some report and followed by actual data (header and rest of
>> the data).
>>
>> So how can i skip first n lines in spark csv. I dont have any specific
>> comment character in the first byte.
>>
>> Please give me some idea.
>>
>> --
>> Selvam Raman
>> "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"
>>
>


-- 
Selvam Raman
"லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"


Re: Reading a TSV file

2016-09-10 Thread Hyukjin Kwon
Yeap. also, sep is preferred and has a higher precedence than delimiter.
​

2016-09-11 0:44 GMT+09:00 Jacek Laskowski :

> Hi Muhammad,
>
> sep or delimiter should both work fine.
>
> Pozdrawiam,
> Jacek Laskowski
> 
> https://medium.com/@jaceklaskowski/
> Mastering Apache Spark 2.0 http://bit.ly/mastering-apache-spark
> Follow me at https://twitter.com/jaceklaskowski
>
>
> On Sat, Sep 10, 2016 at 10:42 AM, Muhammad Asif Abbasi
>  wrote:
> > Thanks for responding. I believe i had already given scala example as a
> part
> > of my code in the second email.
> >
> > Just looked at the DataFrameReader code, and it appears the following
> would
> > work in Java.
> >
> > Dataset pricePaidDS = spark.read().option("sep","\t"
> ).csv(fileName);
> >
> > Thanks for your help.
> >
> > Cheers,
> >
> >
> >
> > On Sat, Sep 10, 2016 at 2:49 PM, Mich Talebzadeh <
> mich.talebza...@gmail.com>
> > wrote:
> >>
> >> Read header false not true
> >>
> >>  val df2 = spark.read.option("header",
> >> false).option("delimiter","\t").csv("hdfs://rhes564:9000/
> tmp/nw_10124772.tsv")
> >>
> >>
> >>
> >> Dr Mich Talebzadeh
> >>
> >>
> >>
> >> LinkedIn
> >> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCd
> OABUrV8Pw
> >>
> >>
> >>
> >> http://talebzadehmich.wordpress.com
> >>
> >>
> >> Disclaimer: Use it at your own risk. Any and all responsibility for any
> >> loss, damage or destruction of data or any other property which may
> arise
> >> from relying on this email's technical content is explicitly
> disclaimed. The
> >> author will in no case be liable for any monetary damages arising from
> such
> >> loss, damage or destruction.
> >>
> >>
> >>
> >>
> >> On 10 September 2016 at 14:46, Mich Talebzadeh <
> mich.talebza...@gmail.com>
> >> wrote:
> >>>
> >>> This should be pretty straight forward?
> >>>
> >>> You can create a tab separated file from any database table and buck
> copy
> >>> out, MSSQL, Sybase etc
> >>>
> >>>  bcp scratchpad..nw_10124772 out nw_10124772.tsv -c -t '\t' -Usa
> -A16384
> >>> Password:
> >>> Starting copy...
> >>> 441 rows copied.
> >>>
> >>> more nw_10124772.tsv
> >>> Mar 22 2011 12:00:00:000AM  SBT 602424  10124772FUNDS
> >>> TRANSFER , FROM A/C 17904064  200.00  200.00
> >>> Mar 22 2011 12:00:00:000AM  SBT 602424  10124772FUNDS
> >>> TRANSFER , FROM A/C 36226823  454.74  654.74
> >>>
> >>> Put that file into hdfs. Note that it has no headers
> >>>
> >>> Read in as a tsv file
> >>>
> >>> scala> val df2 = spark.read.option("header",
> >>> true).option("delimiter","\t").csv("hdfs://rhes564:9000/tmp/
> nw_10124772.tsv")
> >>> df2: org.apache.spark.sql.DataFrame = [Mar 22 2011 12:00:00:000AM:
> >>> string, SBT: string ... 6 more fields]
> >>>
> >>> scala> df2.first
> >>> res7: org.apache.spark.sql.Row = [Mar 22 2011
> >>> 12:00:00:000AM,SBT,602424,10124772,FUNDS TRANSFER , FROM A/C
> >>> 17904064,200.00,,200.00]
> >>>
> >>> HTH
> >>>
> >>>
> >>> Dr Mich Talebzadeh
> >>>
> >>>
> >>>
> >>> LinkedIn
> >>> https://www.linkedin.com/profile/view?id=
> AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> >>>
> >>>
> >>>
> >>> http://talebzadehmich.wordpress.com
> >>>
> >>>
> >>> Disclaimer: Use it at your own risk. Any and all responsibility for any
> >>> loss, damage or destruction of data or any other property which may
> arise
> >>> from relying on this email's technical content is explicitly
> disclaimed. The
> >>> author will in no case be liable for any monetary damages arising from
> such
> >>> loss, damage or destruction.
> >>>
> >>>
> >>>
> >>>
> >>> On 10 September 2016 at 13:57, Mich Talebzadeh
> >>>  wrote:
> 
>  Thanks Jacek.
> 
>  The old stuff with databricks
> 
>  scala> val df =
>  spark.read.format("com.databricks.spark.csv").option("inferSchema",
>  "true").option("header",
>  "true").load("hdfs://rhes564:9000/data/stg/accounts/ll/18740868")
>  df: org.apache.spark.sql.DataFrame = [Transaction Date: string,
>  Transaction Type: string ... 7 more fields]
> 
>  Now I can do
> 
>  scala> val df2 = spark.read.option("header",
>  true).csv("hdfs://rhes564:9000/data/stg/accounts/ll/18740868")
>  df2: org.apache.spark.sql.DataFrame = [Transaction Date: string,
>  Transaction Type: string ... 7 more fields]
> 
>  About Schema stuff that apparently Spark works out itself
> 
>  scala> df.printSchema
>  root
>   |-- Transaction Date: string (nullable = true)
>   |-- Transaction Type: string (nullable = true)
>   |-- Sort Code: string (nullable = true)
>   |-- Account Number: integer (nullable = true)
>   |-- Transaction Description: string (nullable = true)
>   |-- Debit Amount: double (nullable = true)
>   |-- Credit Amount: double (nullable = true)
>   |-- Balance: double (nullable = true)
>   |-- _c8: string (nullable = true)
> 
>  

Re: Spark CSV output

2016-09-10 Thread Hyukjin Kwon
Have you tried the quote related options (e.g. `quote` or `quoteMode`
*https://github.com/databricks/spark-csv/blob/master/README.md#features
)*?

On 11 Sep 2016 12:22 a.m., "ayan guha"  wrote:

> CSV standard uses quote to identify multiline output
> On 11 Sep 2016 01:04, "KhajaAsmath Mohammed" 
> wrote:
>
>> Hi,
>>
>> I am using the package com.databricks.spark.csv to save the dataframe
>> output to hdfs path. I am able to write the output but there are quotations
>> before and after end of the string. Did anyone resolve it when usinig it
>> with com.databricks.spark.csv package.
>>
>> "An account was successfully logged on.|NULL SID|-|-|0x0"
>>
>> Here is sample output that I got with quotations.
>>
>> Thanks,
>> Asmath.
>>
>


Spark using my last job resources and jar files

2016-09-10 Thread nagaraj
I am new to spark . Trying to run spark job with client mode and it works
well if I use the same path for jar and other resource files. After killing
the running application using Yarn command and if spark job is resubmitted
with updated jar and file locations, job still uses my old path. After
reboot of system , spark job takes new path. Spark-submit command

spark-submit \ --class export.streaming.DataExportStreaming \ --jars
/usr/hdp/current/spark-client/lib/postgresql-9.4.1209.jar \
--driver-class-path
/usr/hdp/current/spark-client/lib/postgresql-9.4.1209.jar \ --conf
spark.driver.extraClassPath=/usr/hdp/current/spark-client/lib/postgresql-9.4.1209.jar
\ --conf
spark.executor.extraClassPath=/usr/hdp/current/spark-client/lib/postgresql-9.4.1209.jar
\ --master yarn --deploy-mode client \ --files
/usr/lib/firebet-spark/52.0.2-1/data-export/resources/rollup_selfservice_session_daily.sql,/usr/lib/firebet-spark/52.0.2-1/data-export/resources/rollup_selfservice_session_device.sql,/usr/lib/firebet-spark/52.0.2-1/data-export/resources/rollup_selfservice_session_workflow.sql,/usr/lib/firebet-spark/52.0.2-1/data-export/resources/rollup_selfservice_session_workflow_step.sql,/usr/lib/firebet-spark/52.0.2-1/data-export/resources/rollup_session_assignment.sql,/usr/lib/firebet-spark/52.0.2-1/data-export/resources/rollup_session_daily.sql,/usr/lib/firebet-spark/52.0.2-1/data-export/resources/rollup_session_device.sql,/usr/lib/firebet-spark/52.0.2-1/data-export/resources/rollup_session_queue.sql,/usr/lib/firebet-spark/52.0.2-1/data-export/resources/rollup_session_workflow.sql,/usr/lib/firebet-spark/52.0.2-1/data-export/resources/rollup_session_workflow_step.sql,/usr/lib/firebet-spark/52.0.2-1/data-export/resources/rollup_user_login_session.sql
/usr/lib/firebet-spark/52.0.2-1/data-export/lib/data-export-assembly-52.0.2-1.jar
/usr/lib/firebet-spark/52.0.2-1/data-export/resources/application.conf

How to fix this problem ? Is spark-submit command is correct ? Which
deployment mode is better client or cluster in production ?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-using-my-last-job-resources-and-jar-files-tp27690.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark_JDBC_Partitions

2016-09-10 Thread Mich Talebzadeh
creating an Oracle sequence for a table of 200million is not going to be
that easy without changing the schema. It is possible to export that table
from prod and import it to DEV/TEST and create the sequence there.

If it is a FACT table then the foreign keys from the Dimension tables will
be bitmap indexes on the FACT table so they can be potentially used.

HTH

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 10 September 2016 at 16:42, Takeshi Yamamuro 
wrote:

> Hi,
>
> Yea, spark does not have the same functionality with sqoop.
> I think one of simple solutions is to assign unique ids on the oracle
> table by yourself.
> Thought?
>
> // maropu
>
>
> On Sun, Sep 11, 2016 at 12:37 AM, Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>> Strange that Oracle table of 200Million plus rows has not been
>> partitioned.
>>
>> What matters here is to have parallel connections from JDBC to Oracle,
>> each reading a sub-set of table. Any parallel fetch is going to be better
>> than reading with one connection from Oracle.
>>
>> Surely among 404 columns there must be one with high cardinality to
>> satisfy this work.
>>
>> May be you should just create table  as select * from Oracle_table
>> where rownum <= 100; and use that for test.
>>
>> Other alternative is to use Oracle SQL Connecter for HDFS
>> that
>> can do it for you. With 404 columns it is difficult to suggest any
>> alternative. Is this a FACT table?
>>
>> HTH
>>
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>> On 10 September 2016 at 16:20, Ajay Chander  wrote:
>>
>>> Hello Everyone,
>>>
>>> My goal is to use Spark Sql to load huge amount of data from Oracle to
>>> HDFS.
>>>
>>> *Table in Oracle:*
>>> 1) no primary key.
>>> 2) Has 404 columns.
>>> 3) Has 200,800,000 rows.
>>>
>>> *Spark SQL:*
>>> In my Spark SQL I want to read the data into n number of partitions in
>>> parallel, for which I need to provide 'partition column','lowerBound',
>>> 'upperbound', 'numPartitions' from the table Oracle. My table in Oracle has
>>> no such column to satisfy this need(Highly Skewed), because of it, if the
>>> numPartitions is set to 104, 102 tasks are finished in a minute, 1 task
>>> finishes in 20 mins and the last one takes forever.
>>>
>>> Is there anything I could do to distribute the data evenly into
>>> partitions? Can we set any fake query to orchestrate this pull process, as
>>> we do in SQOOP like this '--boundary-query "SELECT CAST(0 AS NUMBER) AS
>>> MIN_MOD_VAL, CAST(12 AS NUMBER) AS MAX_MOD_VAL FROM DUAL"' ?
>>>
>>> Any pointers are appreciated.
>>>
>>> Thanks for your time.
>>>
>>> ~ Ajay
>>>
>>
>>
>
>
> --
> ---
> Takeshi Yamamuro
>


Problems with Reading CSV Files - Java - Eclipse

2016-09-10 Thread Irfan Kabli
Dear Spark community members,

I am trying to read a CSV file in Spark using Java API.

My setup is as follows:
> Windows Machine
> Local deployment
> Spark 2.0.0
> Eclipse Scala IDE 4.0.0

I am trying to read from the local file system with the following code:

(Using the Java Perspective)

 SparkSession mySparkSession = SparkSession.builder()
.master("local")
.appName("loadingFiles")
.getOrCreate();

Dataset myDataSet=
mySparkSession.read().csv("C:/temp/pricepaid/pp-monthly-update-new-version.csv");

I am getting the following error message when running the application via
Eclipse:

xception in thread "main" java.lang.IllegalArgumentException: Error while
instantiating 'org.apache.spark.sql.internal.SessionState':
at
org.apache.spark.sql.SparkSession$.org$apache$spark$sql$SparkSession$$reflect(SparkSession.scala:949)
at
org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:111)
at org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:110)
at org.apache.spark.sql.SparkSession.conf$lzycompute(SparkSession.scala:133)
at org.apache.spark.sql.SparkSession.conf(SparkSession.scala:133)
at
org.apache.spark.sql.SparkSession$Builder$$anonfun$getOrCreate$5.apply(SparkSession.scala:838)
at
org.apache.spark.sql.SparkSession$Builder$$anonfun$getOrCreate$5.apply(SparkSession.scala:838)
at
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
at
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
at
scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:99)
at
org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:838)
at org.packtpub.SparkFunctionsTest.main(SparkFunctionsTest.java:110)
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:408)
at
org.apache.spark.sql.SparkSession$.org$apache$spark$sql$SparkSession$$reflect(SparkSession.scala:946)
... 13 more
Caused by: java.lang.IllegalArgumentException: Error while instantiating
'org.apache.spark.sql.internal.SharedState':
at
org.apache.spark.sql.SparkSession$.org$apache$spark$sql$SparkSession$$reflect(SparkSession.scala:949)
at
org.apache.spark.sql.SparkSession$$anonfun$sharedState$1.apply(SparkSession.scala:100)
at
org.apache.spark.sql.SparkSession$$anonfun$sharedState$1.apply(SparkSession.scala:100)
at scala.Option.getOrElse(Option.scala:121)
at
org.apache.spark.sql.SparkSession.sharedState$lzycompute(SparkSession.scala:99)
at org.apache.spark.sql.SparkSession.sharedState(SparkSession.scala:98)
at org.apache.spark.sql.internal.SessionState.(SessionState.scala:153)
... 18 more
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:408)
at
org.apache.spark.sql.SparkSession$.org$apache$spark$sql$SparkSession$$reflect(SparkSession.scala:946)
... 24 more
Caused by: java.lang.NoClassDefFoundError:
org/apache/parquet/hadoop/ParquetOutputCommitter
at org.apache.spark.sql.internal.SQLConf$.(SQLConf.scala:235)
at org.apache.spark.sql.internal.SQLConf$.(SQLConf.scala)
at org.apache.spark.sql.internal.SQLConf.setConfString(SQLConf.scala:711)
at
org.apache.spark.sql.internal.SharedState$$anonfun$1.apply(SharedState.scala:67)
at
org.apache.spark.sql.internal.SharedState$$anonfun$1.apply(SharedState.scala:67)
at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at org.apache.spark.sql.internal.SharedState.(SharedState.scala:67)
... 29 more
Caused by: java.lang.ClassNotFoundException:
org.apache.parquet.hadoop.ParquetOutputCommitter
at java.net.URLClassLoader$1.run(URLClassLoader.java:372)
at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:360)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 37 more
16/09/10 16:48:14 INFO SparkContext: Invoking stop() from shutdown hook



Any ideas would be highly appreciated.

Best Regards,
Irfan


Re: Reading a TSV file

2016-09-10 Thread Jacek Laskowski
Hi Muhammad,

sep or delimiter should both work fine.

Pozdrawiam,
Jacek Laskowski

https://medium.com/@jaceklaskowski/
Mastering Apache Spark 2.0 http://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski


On Sat, Sep 10, 2016 at 10:42 AM, Muhammad Asif Abbasi
 wrote:
> Thanks for responding. I believe i had already given scala example as a part
> of my code in the second email.
>
> Just looked at the DataFrameReader code, and it appears the following would
> work in Java.
>
> Dataset pricePaidDS = spark.read().option("sep","\t").csv(fileName);
>
> Thanks for your help.
>
> Cheers,
>
>
>
> On Sat, Sep 10, 2016 at 2:49 PM, Mich Talebzadeh 
> wrote:
>>
>> Read header false not true
>>
>>  val df2 = spark.read.option("header",
>> false).option("delimiter","\t").csv("hdfs://rhes564:9000/tmp/nw_10124772.tsv")
>>
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>> Disclaimer: Use it at your own risk. Any and all responsibility for any
>> loss, damage or destruction of data or any other property which may arise
>> from relying on this email's technical content is explicitly disclaimed. The
>> author will in no case be liable for any monetary damages arising from such
>> loss, damage or destruction.
>>
>>
>>
>>
>> On 10 September 2016 at 14:46, Mich Talebzadeh 
>> wrote:
>>>
>>> This should be pretty straight forward?
>>>
>>> You can create a tab separated file from any database table and buck copy
>>> out, MSSQL, Sybase etc
>>>
>>>  bcp scratchpad..nw_10124772 out nw_10124772.tsv -c -t '\t' -Usa -A16384
>>> Password:
>>> Starting copy...
>>> 441 rows copied.
>>>
>>> more nw_10124772.tsv
>>> Mar 22 2011 12:00:00:000AM  SBT 602424  10124772FUNDS
>>> TRANSFER , FROM A/C 17904064  200.00  200.00
>>> Mar 22 2011 12:00:00:000AM  SBT 602424  10124772FUNDS
>>> TRANSFER , FROM A/C 36226823  454.74  654.74
>>>
>>> Put that file into hdfs. Note that it has no headers
>>>
>>> Read in as a tsv file
>>>
>>> scala> val df2 = spark.read.option("header",
>>> true).option("delimiter","\t").csv("hdfs://rhes564:9000/tmp/nw_10124772.tsv")
>>> df2: org.apache.spark.sql.DataFrame = [Mar 22 2011 12:00:00:000AM:
>>> string, SBT: string ... 6 more fields]
>>>
>>> scala> df2.first
>>> res7: org.apache.spark.sql.Row = [Mar 22 2011
>>> 12:00:00:000AM,SBT,602424,10124772,FUNDS TRANSFER , FROM A/C
>>> 17904064,200.00,,200.00]
>>>
>>> HTH
>>>
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>> Disclaimer: Use it at your own risk. Any and all responsibility for any
>>> loss, damage or destruction of data or any other property which may arise
>>> from relying on this email's technical content is explicitly disclaimed. The
>>> author will in no case be liable for any monetary damages arising from such
>>> loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On 10 September 2016 at 13:57, Mich Talebzadeh
>>>  wrote:

 Thanks Jacek.

 The old stuff with databricks

 scala> val df =
 spark.read.format("com.databricks.spark.csv").option("inferSchema",
 "true").option("header",
 "true").load("hdfs://rhes564:9000/data/stg/accounts/ll/18740868")
 df: org.apache.spark.sql.DataFrame = [Transaction Date: string,
 Transaction Type: string ... 7 more fields]

 Now I can do

 scala> val df2 = spark.read.option("header",
 true).csv("hdfs://rhes564:9000/data/stg/accounts/ll/18740868")
 df2: org.apache.spark.sql.DataFrame = [Transaction Date: string,
 Transaction Type: string ... 7 more fields]

 About Schema stuff that apparently Spark works out itself

 scala> df.printSchema
 root
  |-- Transaction Date: string (nullable = true)
  |-- Transaction Type: string (nullable = true)
  |-- Sort Code: string (nullable = true)
  |-- Account Number: integer (nullable = true)
  |-- Transaction Description: string (nullable = true)
  |-- Debit Amount: double (nullable = true)
  |-- Credit Amount: double (nullable = true)
  |-- Balance: double (nullable = true)
  |-- _c8: string (nullable = true)

 scala> df2.printSchema
 root
  |-- Transaction Date: string (nullable = true)
  |-- Transaction Type: string (nullable = true)
  |-- Sort Code: string (nullable = true)
  |-- Account Number: string (nullable = true)
  |-- Transaction Description: string (nullable = true)
  |-- Debit Amount: string (nullable = true)
  |-- Credit Amount: string (nullable = true)
  |-- Balance: string (nullable = true)
  |-- _c8: string (nullable = true)


Re: Spark_JDBC_Partitions

2016-09-10 Thread Takeshi Yamamuro
Hi,

Yea, spark does not have the same functionality with sqoop.
I think one of simple solutions is to assign unique ids on the oracle table
by yourself.
Thought?

// maropu


On Sun, Sep 11, 2016 at 12:37 AM, Mich Talebzadeh  wrote:

> Strange that Oracle table of 200Million plus rows has not been partitioned.
>
> What matters here is to have parallel connections from JDBC to Oracle,
> each reading a sub-set of table. Any parallel fetch is going to be better
> than reading with one connection from Oracle.
>
> Surely among 404 columns there must be one with high cardinality to
> satisfy this work.
>
> May be you should just create table  as select * from Oracle_table
> where rownum <= 100; and use that for test.
>
> Other alternative is to use Oracle SQL Connecter for HDFS
> that
> can do it for you. With 404 columns it is difficult to suggest any
> alternative. Is this a FACT table?
>
> HTH
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 10 September 2016 at 16:20, Ajay Chander  wrote:
>
>> Hello Everyone,
>>
>> My goal is to use Spark Sql to load huge amount of data from Oracle to
>> HDFS.
>>
>> *Table in Oracle:*
>> 1) no primary key.
>> 2) Has 404 columns.
>> 3) Has 200,800,000 rows.
>>
>> *Spark SQL:*
>> In my Spark SQL I want to read the data into n number of partitions in
>> parallel, for which I need to provide 'partition column','lowerBound',
>> 'upperbound', 'numPartitions' from the table Oracle. My table in Oracle has
>> no such column to satisfy this need(Highly Skewed), because of it, if the
>> numPartitions is set to 104, 102 tasks are finished in a minute, 1 task
>> finishes in 20 mins and the last one takes forever.
>>
>> Is there anything I could do to distribute the data evenly into
>> partitions? Can we set any fake query to orchestrate this pull process, as
>> we do in SQOOP like this '--boundary-query "SELECT CAST(0 AS NUMBER) AS
>> MIN_MOD_VAL, CAST(12 AS NUMBER) AS MAX_MOD_VAL FROM DUAL"' ?
>>
>> Any pointers are appreciated.
>>
>> Thanks for your time.
>>
>> ~ Ajay
>>
>
>


-- 
---
Takeshi Yamamuro


Re: Spark_JDBC_Partitions

2016-09-10 Thread Mich Talebzadeh
Strange that Oracle table of 200Million plus rows has not been partitioned.

What matters here is to have parallel connections from JDBC to Oracle, each
reading a sub-set of table. Any parallel fetch is going to be better than
reading with one connection from Oracle.

Surely among 404 columns there must be one with high cardinality to satisfy
this work.

May be you should just create table  as select * from Oracle_table
where rownum <= 100; and use that for test.

Other alternative is to use Oracle SQL Connecter for HDFS
that
can do it for you. With 404 columns it is difficult to suggest any
alternative. Is this a FACT table?

HTH



Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 10 September 2016 at 16:20, Ajay Chander  wrote:

> Hello Everyone,
>
> My goal is to use Spark Sql to load huge amount of data from Oracle to
> HDFS.
>
> *Table in Oracle:*
> 1) no primary key.
> 2) Has 404 columns.
> 3) Has 200,800,000 rows.
>
> *Spark SQL:*
> In my Spark SQL I want to read the data into n number of partitions in
> parallel, for which I need to provide 'partition column','lowerBound',
> 'upperbound', 'numPartitions' from the table Oracle. My table in Oracle has
> no such column to satisfy this need(Highly Skewed), because of it, if the
> numPartitions is set to 104, 102 tasks are finished in a minute, 1 task
> finishes in 20 mins and the last one takes forever.
>
> Is there anything I could do to distribute the data evenly into
> partitions? Can we set any fake query to orchestrate this pull process, as
> we do in SQOOP like this '--boundary-query "SELECT CAST(0 AS NUMBER) AS
> MIN_MOD_VAL, CAST(12 AS NUMBER) AS MAX_MOD_VAL FROM DUAL"' ?
>
> Any pointers are appreciated.
>
> Thanks for your time.
>
> ~ Ajay
>


Re: java.io.IOException: FAILED_TO_UNCOMPRESS(5)

2016-09-10 Thread Takeshi Yamamuro
Hi,

Seems the known issue, see https://issues.apache.org/jira/browse/SPARK-4105

// maropu

On Sat, Sep 10, 2016 at 11:08 PM, 齐忠  wrote:

> Hi all
>
> when use default compression snappy,I get error when spark doing shuffle
>
> 16/09/09 08:33:15 ERROR executor.Executor: Managed memory leak detected;
> size = 89817648 bytes, TID = 20912
> 16/09/09 08:33:15 ERROR executor.Executor: Exception in task 63.2 in stage
> 1.0 (TID 20912)
> java.io.IOException: FAILED_TO_UNCOMPRESS(5)
> at org.xerial.snappy.SnappyNative.throw_error(
> SnappyNative.java:98)
> at org.xerial.snappy.SnappyNative.rawUncompress(Native Method)
> at org.xerial.snappy.Snappy.rawUncompress(Snappy.java:465)
> at org.xerial.snappy.Snappy.uncompress(Snappy.java:504)
> at org.xerial.snappy.SnappyInputStream.readFully(
> SnappyInputStream.java:147)
> at org.xerial.snappy.SnappyInputStream.readHeader(
> SnappyInputStream.java:99)
> at org.xerial.snappy.SnappyInputStream.(
> SnappyInputStream.java:59)
> at org.apache.spark.io.SnappyCompressionCodec.
> compressedInputStream(CompressionCodec.scala:159)
> at org.apache.spark.storage.BlockManager.wrapForCompression(
> BlockManager.scala:1186)
> at org.apache.spark.shuffle.BlockStoreShuffleReader$$
> anonfun$2.apply(BlockStoreShuffleReader.scala:53)
> at org.apache.spark.shuffle.BlockStoreShuffleReader$$
> anonfun$2.apply(BlockStoreShuffleReader.scala:52)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> at org.apache.spark.util.CompletionIterator.hasNext(
> CompletionIterator.scala:32)
> at org.apache.spark.InterruptibleIterator.hasNext(
> InterruptibleIterator.scala:39)
> at org.apache.spark.util.collection.ExternalAppendOnlyMap.
> insertAll(ExternalAppendOnlyMap.scala:152)
> at org.apache.spark.Aggregator.combineCombinersByKey(
> Aggregator.scala:58)
> at org.apache.spark.shuffle.BlockStoreShuffleReader.read(
> BlockStoreShuffleReader.scala:83)
> at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:98)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(
> MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(
> MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
> at org.apache.spark.scheduler.ShuffleMapTask.runTask(
> ShuffleMapTask.scala:73)
> at org.apache.spark.scheduler.ShuffleMapTask.runTask(
> ShuffleMapTask.scala:41)
> at org.apache.spark.scheduler.Task.run(Task.scala:89)
> at org.apache.spark.executor.Executor$TaskRunner.run(
> Executor.scala:214)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1145)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
>
> env info
>
> spark on yarn(cluster)scalaVersion := "2.10.6"
> libraryDependencies += "org.apache.spark" %% "spark-core" % "1.6.0" % 
> "provided"libraryDependencies += "org.apache.spark" %% "spark-mllib" % 
> "1.6.0" % "provided"
>
>
> ​THANKS​
>
>
> --
> cente...@gmail.com
>



-- 
---
Takeshi Yamamuro


Re: Spark CSV output

2016-09-10 Thread ayan guha
CSV standard uses quote to identify multiline output
On 11 Sep 2016 01:04, "KhajaAsmath Mohammed" 
wrote:

> Hi,
>
> I am using the package com.databricks.spark.csv to save the dataframe
> output to hdfs path. I am able to write the output but there are quotations
> before and after end of the string. Did anyone resolve it when usinig it
> with com.databricks.spark.csv package.
>
> "An account was successfully logged on.|NULL SID|-|-|0x0"
>
> Here is sample output that I got with quotations.
>
> Thanks,
> Asmath.
>


Spark_JDBC_Partitions

2016-09-10 Thread Ajay Chander
Hello Everyone,

My goal is to use Spark Sql to load huge amount of data from Oracle to HDFS.

*Table in Oracle:*
1) no primary key.
2) Has 404 columns.
3) Has 200,800,000 rows.

*Spark SQL:*
In my Spark SQL I want to read the data into n number of partitions in
parallel, for which I need to provide 'partition column','lowerBound',
'upperbound', 'numPartitions' from the table Oracle. My table in Oracle has
no such column to satisfy this need(Highly Skewed), because of it, if the
numPartitions is set to 104, 102 tasks are finished in a minute, 1 task
finishes in 20 mins and the last one takes forever.

Is there anything I could do to distribute the data evenly into partitions?
Can we set any fake query to orchestrate this pull process, as we do in
SQOOP like this '--boundary-query "SELECT CAST(0 AS NUMBER) AS MIN_MOD_VAL,
CAST(12 AS NUMBER) AS MAX_MOD_VAL FROM DUAL"' ?

Any pointers are appreciated.

Thanks for your time.

~ Ajay


Spark CSV output

2016-09-10 Thread KhajaAsmath Mohammed
Hi,

I am using the package com.databricks.spark.csv to save the dataframe
output to hdfs path. I am able to write the output but there are quotations
before and after end of the string. Did anyone resolve it when usinig it
with com.databricks.spark.csv package.

"An account was successfully logged on.|NULL SID|-|-|0x0"

Here is sample output that I got with quotations.

Thanks,
Asmath.


Re: Reading a TSV file

2016-09-10 Thread Muhammad Asif Abbasi
Thanks for responding. I believe i had already given scala example as a
part of my code in the second email.

Just looked at the DataFrameReader code, and it appears the following would
work in Java.

Dataset pricePaidDS = spark.read().*option("sep","\t")*.csv(fileName);

Thanks for your help.

Cheers,



On Sat, Sep 10, 2016 at 2:49 PM, Mich Talebzadeh 
wrote:

> Read header false not true
>
>  val df2 = spark.read.option("header", false).option("delimiter","\t"
> ).csv("hdfs://rhes564:9000/tmp/nw_10124772.tsv")
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 10 September 2016 at 14:46, Mich Talebzadeh 
> wrote:
>
>> This should be pretty straight forward?
>>
>> You can create a tab separated file from any database table and buck copy
>> out, MSSQL, Sybase etc
>>
>>  bcp scratchpad..nw_10124772 out nw_10124772.tsv -c *-t '\t' *-Usa
>> -A16384
>> Password:
>> Starting copy...
>> 441 rows copied.
>>
>> more nw_10124772.tsv
>> Mar 22 2011 12:00:00:000AM  SBT 602424  10124772FUNDS
>> TRANSFER , FROM A/C 17904064  200.00  200.00
>> Mar 22 2011 12:00:00:000AM  SBT 602424  10124772FUNDS
>> TRANSFER , FROM A/C 36226823  454.74  654.74
>>
>> Put that file into hdfs. Note that it has no headers
>>
>> Read in as a tsv file
>>
>> scala> val df2 = spark.read.option("header",
>> true).option("delimiter","\t").csv("hdfs://rhes564:9000/tmp/
>> nw_10124772.tsv")
>> df2: org.apache.spark.sql.DataFrame = [Mar 22 2011 12:00:00:000AM:
>> string, SBT: string ... 6 more fields]
>>
>> scala> df2.first
>> res7: org.apache.spark.sql.Row = [Mar 22 2011
>> 12:00:00:000AM,SBT,602424,10124772,FUNDS TRANSFER , FROM A/C
>> 17904064,200.00,,200.00]
>>
>> HTH
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>> On 10 September 2016 at 13:57, Mich Talebzadeh > > wrote:
>>
>>> Thanks Jacek.
>>>
>>> The old stuff with databricks
>>>
>>> scala> val df = spark.read.format("com.databri
>>> cks.spark.csv").option("inferSchema", "true").option("header",
>>> "true").load("hdfs://rhes564:9000/data/stg/accounts/ll/18740868")
>>> df: org.apache.spark.sql.DataFrame = [Transaction Date: string,
>>> Transaction Type: string ... 7 more fields]
>>>
>>> Now I can do
>>>
>>> scala> val df2 = spark.read.option("header",
>>> true).csv("hdfs://rhes564:9000/data/stg/accounts/ll/18740868")
>>> df2: org.apache.spark.sql.DataFrame = [Transaction Date: string,
>>> Transaction Type: string ... 7 more fields]
>>>
>>> About Schema stuff that apparently Spark works out itself
>>>
>>> scala> df.printSchema
>>> root
>>>  |-- Transaction Date: string (nullable = true)
>>>  |-- Transaction Type: string (nullable = true)
>>>  |-- Sort Code: string (nullable = true)
>>>  |-- Account Number: integer (nullable = true)
>>>  |-- Transaction Description: string (nullable = true)
>>>  |-- Debit Amount: double (nullable = true)
>>>  |-- Credit Amount: double (nullable = true)
>>>  |-- Balance: double (nullable = true)
>>>  |-- _c8: string (nullable = true)
>>>
>>> scala> df2.printSchema
>>> root
>>>  |-- Transaction Date: string (nullable = true)
>>>  |-- Transaction Type: string (nullable = true)
>>>  |-- Sort Code: string (nullable = true)
>>>  |-- Account Number: string (nullable = true)
>>>  |-- Transaction Description: string (nullable = true)
>>>  |-- Debit Amount: string (nullable = true)
>>>  |-- Credit Amount: string (nullable = true)
>>>  |-- Balance: string (nullable = true)
>>>  |-- _c8: string (nullable = true)
>>>
>>> Cheers
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * 
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> *
>>>
>>>
>>>
>>> 

java.io.IOException: FAILED_TO_UNCOMPRESS(5)

2016-09-10 Thread 齐忠
Hi all

when use default compression snappy,I get error when spark doing shuffle

16/09/09 08:33:15 ERROR executor.Executor: Managed memory leak detected;
size = 89817648 bytes, TID = 20912
16/09/09 08:33:15 ERROR executor.Executor: Exception in task 63.2 in stage
1.0 (TID 20912)
java.io.IOException: FAILED_TO_UNCOMPRESS(5)
at org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:98)
at org.xerial.snappy.SnappyNative.rawUncompress(Native Method)
at org.xerial.snappy.Snappy.rawUncompress(Snappy.java:465)
at org.xerial.snappy.Snappy.uncompress(Snappy.java:504)
at
org.xerial.snappy.SnappyInputStream.readFully(SnappyInputStream.java:147)
at
org.xerial.snappy.SnappyInputStream.readHeader(SnappyInputStream.java:99)
at
org.xerial.snappy.SnappyInputStream.(SnappyInputStream.java:59)
at
org.apache.spark.io.SnappyCompressionCodec.compressedInputStream(CompressionCodec.scala:159)
at
org.apache.spark.storage.BlockManager.wrapForCompression(BlockManager.scala:1186)
at
org.apache.spark.shuffle.BlockStoreShuffleReader$$anonfun$2.apply(BlockStoreShuffleReader.scala:53)
at
org.apache.spark.shuffle.BlockStoreShuffleReader$$anonfun$2.apply(BlockStoreShuffleReader.scala:52)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
at
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
at
org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:152)
at
org.apache.spark.Aggregator.combineCombinersByKey(Aggregator.scala:58)
at
org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:83)
at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:98)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

env info

spark on yarn(cluster)scalaVersion := "2.10.6"
libraryDependencies += "org.apache.spark" %% "spark-core" % "1.6.0" %
"provided"libraryDependencies += "org.apache.spark" %% "spark-mllib" %
"1.6.0" % "provided"


​THANKS​


-- 
cente...@gmail.com


Re: Reading a TSV file

2016-09-10 Thread Mich Talebzadeh
Read header false not true

 val df2 = spark.read.option("header",
false).option("delimiter","\t").csv("hdfs://rhes564:9000/tmp/nw_10124772.tsv")



Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 10 September 2016 at 14:46, Mich Talebzadeh 
wrote:

> This should be pretty straight forward?
>
> You can create a tab separated file from any database table and buck copy
> out, MSSQL, Sybase etc
>
>  bcp scratchpad..nw_10124772 out nw_10124772.tsv -c *-t '\t' *-Usa -A16384
> Password:
> Starting copy...
> 441 rows copied.
>
> more nw_10124772.tsv
> Mar 22 2011 12:00:00:000AM  SBT 602424  10124772FUNDS
> TRANSFER , FROM A/C 17904064  200.00  200.00
> Mar 22 2011 12:00:00:000AM  SBT 602424  10124772FUNDS
> TRANSFER , FROM A/C 36226823  454.74  654.74
>
> Put that file into hdfs. Note that it has no headers
>
> Read in as a tsv file
>
> scala> val df2 = spark.read.option("header", true).option("delimiter","\t")
> .csv("hdfs://rhes564:9000/tmp/nw_10124772.tsv")
> df2: org.apache.spark.sql.DataFrame = [Mar 22 2011 12:00:00:000AM: string,
> SBT: string ... 6 more fields]
>
> scala> df2.first
> res7: org.apache.spark.sql.Row = [Mar 22 2011 
> 12:00:00:000AM,SBT,602424,10124772,FUNDS
> TRANSFER , FROM A/C 17904064,200.00,,200.00]
>
> HTH
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 10 September 2016 at 13:57, Mich Talebzadeh 
> wrote:
>
>> Thanks Jacek.
>>
>> The old stuff with databricks
>>
>> scala> val df = 
>> spark.read.format("com.databricks.spark.csv").option("inferSchema",
>> "true").option("header", "true").load("hdfs://rhes564:9
>> 000/data/stg/accounts/ll/18740868")
>> df: org.apache.spark.sql.DataFrame = [Transaction Date: string,
>> Transaction Type: string ... 7 more fields]
>>
>> Now I can do
>>
>> scala> val df2 = spark.read.option("header",
>> true).csv("hdfs://rhes564:9000/data/stg/accounts/ll/18740868")
>> df2: org.apache.spark.sql.DataFrame = [Transaction Date: string,
>> Transaction Type: string ... 7 more fields]
>>
>> About Schema stuff that apparently Spark works out itself
>>
>> scala> df.printSchema
>> root
>>  |-- Transaction Date: string (nullable = true)
>>  |-- Transaction Type: string (nullable = true)
>>  |-- Sort Code: string (nullable = true)
>>  |-- Account Number: integer (nullable = true)
>>  |-- Transaction Description: string (nullable = true)
>>  |-- Debit Amount: double (nullable = true)
>>  |-- Credit Amount: double (nullable = true)
>>  |-- Balance: double (nullable = true)
>>  |-- _c8: string (nullable = true)
>>
>> scala> df2.printSchema
>> root
>>  |-- Transaction Date: string (nullable = true)
>>  |-- Transaction Type: string (nullable = true)
>>  |-- Sort Code: string (nullable = true)
>>  |-- Account Number: string (nullable = true)
>>  |-- Transaction Description: string (nullable = true)
>>  |-- Debit Amount: string (nullable = true)
>>  |-- Credit Amount: string (nullable = true)
>>  |-- Balance: string (nullable = true)
>>  |-- _c8: string (nullable = true)
>>
>> Cheers
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>> On 10 September 2016 at 13:12, Jacek Laskowski  wrote:
>>
>>> Hi Mich,
>>>
>>> CSV is now one of the 7 formats supported by SQL in 2.0. No need to
>>> 

Re: Reading a TSV file

2016-09-10 Thread Mich Talebzadeh
This should be pretty straight forward?

You can create a tab separated file from any database table and buck copy
out, MSSQL, Sybase etc

 bcp scratchpad..nw_10124772 out nw_10124772.tsv -c *-t '\t' *-Usa -A16384
Password:
Starting copy...
441 rows copied.

more nw_10124772.tsv
Mar 22 2011 12:00:00:000AM  SBT 602424  10124772FUNDS
TRANSFER , FROM A/C 17904064  200.00  200.00
Mar 22 2011 12:00:00:000AM  SBT 602424  10124772FUNDS
TRANSFER , FROM A/C 36226823  454.74  654.74

Put that file into hdfs. Note that it has no headers

Read in as a tsv file

scala> val df2 = spark.read.option("header", true).option("delimiter","\t")
.csv("hdfs://rhes564:9000/tmp/nw_10124772.tsv")
df2: org.apache.spark.sql.DataFrame = [Mar 22 2011 12:00:00:000AM: string,
SBT: string ... 6 more fields]

scala> df2.first
res7: org.apache.spark.sql.Row = [Mar 22 2011
12:00:00:000AM,SBT,602424,10124772,FUNDS TRANSFER , FROM A/C
17904064,200.00,,200.00]

HTH

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 10 September 2016 at 13:57, Mich Talebzadeh 
wrote:

> Thanks Jacek.
>
> The old stuff with databricks
>
> scala> val df = 
> spark.read.format("com.databricks.spark.csv").option("inferSchema",
> "true").option("header", "true").load("hdfs://rhes564:
> 9000/data/stg/accounts/ll/18740868")
> df: org.apache.spark.sql.DataFrame = [Transaction Date: string,
> Transaction Type: string ... 7 more fields]
>
> Now I can do
>
> scala> val df2 = spark.read.option("header", true).csv("hdfs://rhes564:
> 9000/data/stg/accounts/ll/18740868")
> df2: org.apache.spark.sql.DataFrame = [Transaction Date: string,
> Transaction Type: string ... 7 more fields]
>
> About Schema stuff that apparently Spark works out itself
>
> scala> df.printSchema
> root
>  |-- Transaction Date: string (nullable = true)
>  |-- Transaction Type: string (nullable = true)
>  |-- Sort Code: string (nullable = true)
>  |-- Account Number: integer (nullable = true)
>  |-- Transaction Description: string (nullable = true)
>  |-- Debit Amount: double (nullable = true)
>  |-- Credit Amount: double (nullable = true)
>  |-- Balance: double (nullable = true)
>  |-- _c8: string (nullable = true)
>
> scala> df2.printSchema
> root
>  |-- Transaction Date: string (nullable = true)
>  |-- Transaction Type: string (nullable = true)
>  |-- Sort Code: string (nullable = true)
>  |-- Account Number: string (nullable = true)
>  |-- Transaction Description: string (nullable = true)
>  |-- Debit Amount: string (nullable = true)
>  |-- Credit Amount: string (nullable = true)
>  |-- Balance: string (nullable = true)
>  |-- _c8: string (nullable = true)
>
> Cheers
>
>
>
>
>
>
>
>
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 10 September 2016 at 13:12, Jacek Laskowski  wrote:
>
>> Hi Mich,
>>
>> CSV is now one of the 7 formats supported by SQL in 2.0. No need to
>> use "com.databricks.spark.csv" and --packages. A mere format("csv") or
>> csv(path: String) would do it. The options are same.
>>
>> p.s. Yup, when I read TSV I thought about time series data that I
>> believe got its own file format and support @ spark-packages.
>>
>> Pozdrawiam,
>> Jacek Laskowski
>> 
>> https://medium.com/@jaceklaskowski/
>> Mastering Apache Spark 2.0 http://bit.ly/mastering-apache-spark
>> Follow me at https://twitter.com/jaceklaskowski
>>
>>
>> On Sat, Sep 10, 2016 at 8:00 AM, Mich Talebzadeh
>>  wrote:
>> > I gather the title should say CSV as opposed to tsv?
>> >
>> > Also when the term spark-csv is used is it a reference to databricks
>> stuff?
>> >
>> > val df = spark.read.format("com.databricks.spark.csv").option("
>> inferSchema",
>> > "true").option("header", "true").load..
>> >
>> > or it is something new in 2 like spark-sql etc?
>> >
>> > Thanks
>> >
>> > Dr Mich Talebzadeh
>> >
>> >
>> >
>> > 

Re: Reading a TSV file

2016-09-10 Thread Mich Talebzadeh
Thanks Jacek.

The old stuff with databricks

scala> val df =
spark.read.format("com.databricks.spark.csv").option("inferSchema",
"true").option("header",
"true").load("hdfs://rhes564:9000/data/stg/accounts/ll/18740868")
df: org.apache.spark.sql.DataFrame = [Transaction Date: string, Transaction
Type: string ... 7 more fields]

Now I can do

scala> val df2 = spark.read.option("header",
true).csv("hdfs://rhes564:9000/data/stg/accounts/ll/18740868")
df2: org.apache.spark.sql.DataFrame = [Transaction Date: string,
Transaction Type: string ... 7 more fields]

About Schema stuff that apparently Spark works out itself

scala> df.printSchema
root
 |-- Transaction Date: string (nullable = true)
 |-- Transaction Type: string (nullable = true)
 |-- Sort Code: string (nullable = true)
 |-- Account Number: integer (nullable = true)
 |-- Transaction Description: string (nullable = true)
 |-- Debit Amount: double (nullable = true)
 |-- Credit Amount: double (nullable = true)
 |-- Balance: double (nullable = true)
 |-- _c8: string (nullable = true)

scala> df2.printSchema
root
 |-- Transaction Date: string (nullable = true)
 |-- Transaction Type: string (nullable = true)
 |-- Sort Code: string (nullable = true)
 |-- Account Number: string (nullable = true)
 |-- Transaction Description: string (nullable = true)
 |-- Debit Amount: string (nullable = true)
 |-- Credit Amount: string (nullable = true)
 |-- Balance: string (nullable = true)
 |-- _c8: string (nullable = true)

Cheers











Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 10 September 2016 at 13:12, Jacek Laskowski  wrote:

> Hi Mich,
>
> CSV is now one of the 7 formats supported by SQL in 2.0. No need to
> use "com.databricks.spark.csv" and --packages. A mere format("csv") or
> csv(path: String) would do it. The options are same.
>
> p.s. Yup, when I read TSV I thought about time series data that I
> believe got its own file format and support @ spark-packages.
>
> Pozdrawiam,
> Jacek Laskowski
> 
> https://medium.com/@jaceklaskowski/
> Mastering Apache Spark 2.0 http://bit.ly/mastering-apache-spark
> Follow me at https://twitter.com/jaceklaskowski
>
>
> On Sat, Sep 10, 2016 at 8:00 AM, Mich Talebzadeh
>  wrote:
> > I gather the title should say CSV as opposed to tsv?
> >
> > Also when the term spark-csv is used is it a reference to databricks
> stuff?
> >
> > val df = spark.read.format("com.databricks.spark.csv").option(
> "inferSchema",
> > "true").option("header", "true").load..
> >
> > or it is something new in 2 like spark-sql etc?
> >
> > Thanks
> >
> > Dr Mich Talebzadeh
> >
> >
> >
> > LinkedIn
> > https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCd
> OABUrV8Pw
> >
> >
> >
> > http://talebzadehmich.wordpress.com
> >
> >
> > Disclaimer: Use it at your own risk. Any and all responsibility for any
> > loss, damage or destruction of data or any other property which may arise
> > from relying on this email's technical content is explicitly disclaimed.
> The
> > author will in no case be liable for any monetary damages arising from
> such
> > loss, damage or destruction.
> >
> >
> >
> >
> > On 10 September 2016 at 12:37, Jacek Laskowski  wrote:
> >>
> >> Hi,
> >>
> >> If Spark 2.0 supports a format, use it. For CSV it's csv() or
> >> format("csv"). It should be supported by Scala and Java. If the API's
> >> broken for Java (but works for Scala), you'd have to create a "bridge"
> >> yourself or report an issue in Spark's JIRA @
> >> https://issues.apache.org/jira/browse/SPARK.
> >>
> >> Have you run into any issues with CSV and Java? Share the code.
> >>
> >> Pozdrawiam,
> >> Jacek Laskowski
> >> 
> >> https://medium.com/@jaceklaskowski/
> >> Mastering Apache Spark 2.0 http://bit.ly/mastering-apache-spark
> >> Follow me at https://twitter.com/jaceklaskowski
> >>
> >>
> >> On Sat, Sep 10, 2016 at 7:30 AM, Muhammad Asif Abbasi
> >>  wrote:
> >> > Hi,
> >> >
> >> > I would like to know what is the most efficient way of reading tsv in
> >> > Scala,
> >> > Python and Java with Spark 2.0.
> >> >
> >> > I believe with Spark 2.0 CSV is a native source based on Spark-csv
> >> > module,
> >> > and we can potentially read a "tsv" file by specifying
> >> >
> >> > 1. Option ("delimiter","\t") in Scala
> >> > 2. sep declaration in Python.
> >> >
> >> > However I am unsure what is the best way to achieve this in Java.
> >> > Furthermore, 

Re: Reading a TSV file

2016-09-10 Thread Muhammad Asif Abbasi
Thanks for the quick response.

Let me rephrase the question which I admit wasn't clearly worded and
perhaps too abstract.

To read a CSV i am using the following code (works perfectly).
SparkSession spark = SparkSession.builder()
.master("local")
.appName("Reading a CSV")
.config("spark.some.config.option", "some-value")
.getOrCreate();

Dataset pricePaidDS = spark.read().csv(fileName);


I need to read a TSV (Tab separated values) file.


With Scala, you can do the following to read a TSV:


val testDS = spark.read.format("csv").*option("delimiter","\t")*
.load(tsvFileLocation)


With Python you can do the following:


testDS = spark.read.csv(tsvFileLocation,*sep="\t"*)


So while I am able to read a CSV file, how do i read a "tsv" {tab separated
file}.  I am looking for an option to pass a delimiter while reading the
file.

Hope this clarifies the question.

Appreciate your help.

Regards,





On Sat, Sep 10, 2016 at 1:12 PM, Jacek Laskowski  wrote:

> Hi Mich,
>
> CSV is now one of the 7 formats supported by SQL in 2.0. No need to
> use "com.databricks.spark.csv" and --packages. A mere format("csv") or
> csv(path: String) would do it. The options are same.
>
> p.s. Yup, when I read TSV I thought about time series data that I
> believe got its own file format and support @ spark-packages.
>
> Pozdrawiam,
> Jacek Laskowski
> 
> https://medium.com/@jaceklaskowski/
> Mastering Apache Spark 2.0 http://bit.ly/mastering-apache-spark
> Follow me at https://twitter.com/jaceklaskowski
>
>
> On Sat, Sep 10, 2016 at 8:00 AM, Mich Talebzadeh
>  wrote:
> > I gather the title should say CSV as opposed to tsv?
> >
> > Also when the term spark-csv is used is it a reference to databricks
> stuff?
> >
> > val df = spark.read.format("com.databricks.spark.csv").option(
> "inferSchema",
> > "true").option("header", "true").load..
> >
> > or it is something new in 2 like spark-sql etc?
> >
> > Thanks
> >
> > Dr Mich Talebzadeh
> >
> >
> >
> > LinkedIn
> > https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCd
> OABUrV8Pw
> >
> >
> >
> > http://talebzadehmich.wordpress.com
> >
> >
> > Disclaimer: Use it at your own risk. Any and all responsibility for any
> > loss, damage or destruction of data or any other property which may arise
> > from relying on this email's technical content is explicitly disclaimed.
> The
> > author will in no case be liable for any monetary damages arising from
> such
> > loss, damage or destruction.
> >
> >
> >
> >
> > On 10 September 2016 at 12:37, Jacek Laskowski  wrote:
> >>
> >> Hi,
> >>
> >> If Spark 2.0 supports a format, use it. For CSV it's csv() or
> >> format("csv"). It should be supported by Scala and Java. If the API's
> >> broken for Java (but works for Scala), you'd have to create a "bridge"
> >> yourself or report an issue in Spark's JIRA @
> >> https://issues.apache.org/jira/browse/SPARK.
> >>
> >> Have you run into any issues with CSV and Java? Share the code.
> >>
> >> Pozdrawiam,
> >> Jacek Laskowski
> >> 
> >> https://medium.com/@jaceklaskowski/
> >> Mastering Apache Spark 2.0 http://bit.ly/mastering-apache-spark
> >> Follow me at https://twitter.com/jaceklaskowski
> >>
> >>
> >> On Sat, Sep 10, 2016 at 7:30 AM, Muhammad Asif Abbasi
> >>  wrote:
> >> > Hi,
> >> >
> >> > I would like to know what is the most efficient way of reading tsv in
> >> > Scala,
> >> > Python and Java with Spark 2.0.
> >> >
> >> > I believe with Spark 2.0 CSV is a native source based on Spark-csv
> >> > module,
> >> > and we can potentially read a "tsv" file by specifying
> >> >
> >> > 1. Option ("delimiter","\t") in Scala
> >> > 2. sep declaration in Python.
> >> >
> >> > However I am unsure what is the best way to achieve this in Java.
> >> > Furthermore, are the above most optimum ways to read a tsv file?
> >> >
> >> > Appreciate a response on this.
> >> >
> >> > Regards.
> >>
> >> -
> >> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >>
> >
>


Re: Reading a TSV file

2016-09-10 Thread Jacek Laskowski
Hi Mich,

CSV is now one of the 7 formats supported by SQL in 2.0. No need to
use "com.databricks.spark.csv" and --packages. A mere format("csv") or
csv(path: String) would do it. The options are same.

p.s. Yup, when I read TSV I thought about time series data that I
believe got its own file format and support @ spark-packages.

Pozdrawiam,
Jacek Laskowski

https://medium.com/@jaceklaskowski/
Mastering Apache Spark 2.0 http://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski


On Sat, Sep 10, 2016 at 8:00 AM, Mich Talebzadeh
 wrote:
> I gather the title should say CSV as opposed to tsv?
>
> Also when the term spark-csv is used is it a reference to databricks stuff?
>
> val df = spark.read.format("com.databricks.spark.csv").option("inferSchema",
> "true").option("header", "true").load..
>
> or it is something new in 2 like spark-sql etc?
>
> Thanks
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> Disclaimer: Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed. The
> author will in no case be liable for any monetary damages arising from such
> loss, damage or destruction.
>
>
>
>
> On 10 September 2016 at 12:37, Jacek Laskowski  wrote:
>>
>> Hi,
>>
>> If Spark 2.0 supports a format, use it. For CSV it's csv() or
>> format("csv"). It should be supported by Scala and Java. If the API's
>> broken for Java (but works for Scala), you'd have to create a "bridge"
>> yourself or report an issue in Spark's JIRA @
>> https://issues.apache.org/jira/browse/SPARK.
>>
>> Have you run into any issues with CSV and Java? Share the code.
>>
>> Pozdrawiam,
>> Jacek Laskowski
>> 
>> https://medium.com/@jaceklaskowski/
>> Mastering Apache Spark 2.0 http://bit.ly/mastering-apache-spark
>> Follow me at https://twitter.com/jaceklaskowski
>>
>>
>> On Sat, Sep 10, 2016 at 7:30 AM, Muhammad Asif Abbasi
>>  wrote:
>> > Hi,
>> >
>> > I would like to know what is the most efficient way of reading tsv in
>> > Scala,
>> > Python and Java with Spark 2.0.
>> >
>> > I believe with Spark 2.0 CSV is a native source based on Spark-csv
>> > module,
>> > and we can potentially read a "tsv" file by specifying
>> >
>> > 1. Option ("delimiter","\t") in Scala
>> > 2. sep declaration in Python.
>> >
>> > However I am unsure what is the best way to achieve this in Java.
>> > Furthermore, are the above most optimum ways to read a tsv file?
>> >
>> > Appreciate a response on this.
>> >
>> > Regards.
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: spark streaming kafka connector questions

2016-09-10 Thread Cody Koeninger
Hard to say without seeing the code, but if you do multiple actions on an
Rdd without caching, the Rdd will be computed multiple times.

On Sep 10, 2016 2:43 AM, "Cheng Yi"  wrote:

After some investigation, the problem i see is liked caused by a filter and
union of the dstream.
if i just do kafka-stream -- process -- output operator, then there is no
problem, one event will be fetched once.
if i do
kafka-stream -- process(1) - filter a stream A for later union --|
   |_ filter a stream B  -- process(2)
-|_ A union B output process (3)
the event will be fetched 2 times, duplicate message start process at the
end of process(1), see following traces:

16/09/10 00:11:00 INFO CachedKafkaConsumer: Initial fetch for
spark-executor-testgid log-analysis-topic 2 1 *(fetch EVENT 1st time)*

16/09/10 00:11:00 INFO AbstractCoordinator: Discovered coordinator
192.168.2.6:9092 (id: 2147483647 rack: null) for group
spark-executor-testgid.

log of processing (1) for event 1

16/09/10 00:11:03 INFO Executor: Finished task 0.0 in stage 9.0 (TID 36).
1401 bytes result sent to driver

16/09/10 00:11:03 INFO TaskSetManager: Finished task 0.0 in stage 9.0 (TID
36) in 3494 ms on localhost (3/3)

16/09/10 00:11:03 INFO TaskSchedulerImpl: Removed TaskSet 9.0, whose tasks
have all completed, from pool

16/09/10 00:11:03 INFO DAGScheduler: ShuffleMapStage 9 (flatMapToPair
(*processing (1)*) at SparkAppDriver.java:136) finished in 3.506 s

16/09/10 00:11:03 INFO DAGScheduler: looking for newly runnable stages

16/09/10 00:11:03 INFO DAGScheduler: running: Set()

16/09/10 00:11:03 INFO DAGScheduler: waiting: Set(ShuffleMapStage 10,
ResultStage 11)

16/09/10 00:11:03 INFO DAGScheduler: failed: Set()

16/09/10 00:11:03 INFO DAGScheduler: Submitting ShuffleMapStage 10
(UnionRDD[41] at union (*process (3)*) at SparkAppDriver.java:155), which
has no missing parents

16/09/10 00:11:03 INFO DAGScheduler: Submitting 6 missing tasks from
ShuffleMapStage 10 (UnionRDD[41] at union at SparkAppDriver.java:155)

16/09/10 00:11:03 INFO KafkaRDD: Computing topic log-analysis-topic,
partition 2 offsets 1 -> 2

16/09/10 00:11:03 INFO CachedKafkaConsumer: Initial fetch for
spark-executor-testgid log-analysis-topic 2 1 ( *(fetch the same EVENT 2nd
time)*)

16/09/10 00:11:10 INFO JobScheduler: Finished job streaming job
147349146 ms.0 from job set of time 147349146 ms

16/09/10 00:11:10 INFO JobScheduler: Total delay: 10.920 s for time
147349146 ms (execution: 10.874 s)* (EVENT 1st time process cost 10.874
s)*

16/09/10 00:11:10 INFO JobScheduler: Finished job streaming job
1473491465000 ms.0 from job set of time 1473491465000 ms

16/09/10 00:11:10 INFO JobScheduler: Total delay: 5.986 s for time
1473491465000 ms (execution: 0.066 s) *(EVENT 2nd time process cost 0.066)*

and the 2nd time processing of the event finished without really doing the
work.

Help is hugely appreciated.



--
View this message in context: http://apache-spark-user-list.
1001560.n3.nabble.com/spark-streaming-kafka-connector-
questions-tp27681p27687.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org


Re: Reading a TSV file

2016-09-10 Thread Mich Talebzadeh
I gather the title should say CSV as opposed to tsv?

Also when the term spark-csv is used is it a reference to databricks stuff?

val df =
spark.read.format("com.databricks.spark.csv").option("inferSchema",
"true").option("header", "true").load..

or it is something new in 2 like spark-sql etc?

Thanks

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 10 September 2016 at 12:37, Jacek Laskowski  wrote:

> Hi,
>
> If Spark 2.0 supports a format, use it. For CSV it's csv() or
> format("csv"). It should be supported by Scala and Java. If the API's
> broken for Java (but works for Scala), you'd have to create a "bridge"
> yourself or report an issue in Spark's JIRA @
> https://issues.apache.org/jira/browse/SPARK.
>
> Have you run into any issues with CSV and Java? Share the code.
>
> Pozdrawiam,
> Jacek Laskowski
> 
> https://medium.com/@jaceklaskowski/
> Mastering Apache Spark 2.0 http://bit.ly/mastering-apache-spark
> Follow me at https://twitter.com/jaceklaskowski
>
>
> On Sat, Sep 10, 2016 at 7:30 AM, Muhammad Asif Abbasi
>  wrote:
> > Hi,
> >
> > I would like to know what is the most efficient way of reading tsv in
> Scala,
> > Python and Java with Spark 2.0.
> >
> > I believe with Spark 2.0 CSV is a native source based on Spark-csv
> module,
> > and we can potentially read a "tsv" file by specifying
> >
> > 1. Option ("delimiter","\t") in Scala
> > 2. sep declaration in Python.
> >
> > However I am unsure what is the best way to achieve this in Java.
> > Furthermore, are the above most optimum ways to read a tsv file?
> >
> > Appreciate a response on this.
> >
> > Regards.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Reading a TSV file

2016-09-10 Thread Jacek Laskowski
Hi,

If Spark 2.0 supports a format, use it. For CSV it's csv() or
format("csv"). It should be supported by Scala and Java. If the API's
broken for Java (but works for Scala), you'd have to create a "bridge"
yourself or report an issue in Spark's JIRA @
https://issues.apache.org/jira/browse/SPARK.

Have you run into any issues with CSV and Java? Share the code.

Pozdrawiam,
Jacek Laskowski

https://medium.com/@jaceklaskowski/
Mastering Apache Spark 2.0 http://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski


On Sat, Sep 10, 2016 at 7:30 AM, Muhammad Asif Abbasi
 wrote:
> Hi,
>
> I would like to know what is the most efficient way of reading tsv in Scala,
> Python and Java with Spark 2.0.
>
> I believe with Spark 2.0 CSV is a native source based on Spark-csv module,
> and we can potentially read a "tsv" file by specifying
>
> 1. Option ("delimiter","\t") in Scala
> 2. sep declaration in Python.
>
> However I am unsure what is the best way to achieve this in Java.
> Furthermore, are the above most optimum ways to read a tsv file?
>
> Appreciate a response on this.
>
> Regards.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Reading a TSV file

2016-09-10 Thread Muhammad Asif Abbasi
Hi,

I would like to know what is the most efficient way of reading tsv in
Scala, Python and Java with Spark 2.0.

I believe with Spark 2.0 CSV is a native source based on Spark-csv module,
and we can potentially read a "tsv" file by specifying

1. Option ("delimiter","\t") in Scala
2. sep declaration in Python.

However I am unsure what is the best way to achieve this in Java.
Furthermore, are the above most optimum ways to read a tsv file?

Appreciate a response on this.

Regards.


Re: Does it run distributed if class not Serializable

2016-09-10 Thread Yan Facai
I believe that Serializable is necessary for distributing.

On Fri, Sep 9, 2016 at 7:10 PM, Gourav Sengupta 
wrote:

> And you are using JAVA?
>
> AND WHY?
>
> Regards,
> Gourav
>
> On Fri, Sep 9, 2016 at 11:47 AM, Yusuf Can Gürkan 
> wrote:
>
>> Hi,
>>
>> If i don't make a class Serializable (... extends Serializable) will it
>> run distributed with executors or it will only run on master machine?
>>
>> Thanks
>>
>
>


Re: SparkSQL DAG generation , DAG optimization , DAG execution

2016-09-10 Thread Mich Talebzadeh
right let us simplify this.

can you run the whole thing *once* only and send dag execution output from
UI?

you can use snipping tool to take the image.

HTH


Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 10 September 2016 at 09:59, Rabin Banerjee 
wrote:

> Hi ,
>
>
>1. You are doing some analytics I guess?  *YES*
>2. It is almost impossible to guess what is happening except that you
>are looping 50 times over the same set of sql?  *I am Not Looping any
>SQL, All SQLs are called exactly once , which requires output from prev
>SQL.*
>3. Your sql step n depends on step n-1. So spark cannot get rid of 1
>- n steps, *TRUE, But If I have N SQL and all i th SQL is
>dependent upon i-1 , how spark optimize the memory ? is it like for each i
>th sql it will start execution from STAGE 0 *
>4. you are not storing anything in  memory(no cache, no persist), so
>all memory is used for the execution , IF Spark is not storing anything in
>memory , then when it is executing *i th sql it will start execution
>from STAGE 0 i.e starting from file read *
>5. What happens when you run it only once? How much memory is used
>(look at UI page, 4040 by default) , ? *I checked Spark UI DAG , so
>many file reads , Why ?*
>6.  What Spark mode is being used (Local, Standalone, Yarn) ? *Yarn*
>7. OOM could be anything depending on how much you are allocating to
>your driver memory in spark-submit ? *Driver and executor memory is
>set as 4gb , input data size is less than 1 GB, NO of executor is 5.*
>
> *I am still bit confused about spark's execution plan on multiple SQL with
> only one action .*
>
> *Is it executing each SQL separately and trying to store intermediate
> result in memory which is causing OOM/GC overhead ?*
> *And Still my questions are ...*
>
> *1. Will Spark optimize multiple SQL queries into one single plysical plan
> Which will at least will not execute same stage twice , read file once... ?*
> *2. In DAG I can see a lot of file read and lot of stages , Why ? I only
> called action once ? Why in multiple stage Spark is again starting from
> file reading ?*
> *3. Is every SQL will execute and its intermediate result will be stored
> in memory ?*
> *4. What is something that causing OOM and GC overhead here ?*
> *5. What is optimization that could be taken care of ? *
>
>
> On Sat, Sep 10, 2016 at 11:35 AM, Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>> Hi
>>
>>1. You are doing some analytics I guess?
>>2. It is almost impossible to guess what is happening except that you
>>are looping 50 times over the same set of sql?
>>3. Your sql step n depends on step n-1. So spark cannot get rid of 1
>>-n steps
>>4. you are not storing anything in  memory(no cache, no persist), so
>>all memory is used for the execution
>>5. What happens when you run it only once? How much memory is used
>>(look at UI page, 4040 by default)
>>6.  What Spark mode is being used (Local, Standalone, Yarn)
>>7. OOM could be anything depending on how much you are allocating to
>>your driver memory in spark-submit
>>
>> HTH
>>
>>
>>
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>> On 10 September 2016 at 06:21, Rabin Banerjee <
>> dev.rabin.baner...@gmail.com> wrote:
>>
>>> HI All,
>>>
>>>  I am writing and executing a Spark Batch program which only use
>>> SPARK-SQL , But it is taking lot of time and finally giving GC overhead .
>>>
>>> Here is the program ,
>>>
>>> 1.Read 3 files ,one medium size and 2 small files, and register them as
>>> DF.
>>> 2.
>>>  fire sql with complex aggregation and windowing .
>>>  register result as DF.
>>>
>>> 3.  .Repeat step 2 almost 50 times .so 50 sql .
>>>
>>> 4. All SQLs are sequential , i.e next step requires prev step 

Re: Spark CSV skip lines

2016-09-10 Thread Hyukjin Kwon
Hi Selvam,

If your report is commented with any character (e.g. #), you can skip these
lines via comment option [1].

If you are using Spark 1.x, then you might be able to do this by manually
skipping from the RDD and then making this to DataFrame as below:

I haven’t tested this but I think this should work.

val rdd = sparkContext.textFile("...")
val filteredRdd = rdd.mapPartitionsWithIndex { (idx, iter) =>
  if (idx == 0) {
iter.drop(10)
  } else {
iter
  }
}
val df = new CsvParser().csvRdd(sqlContext, filteredRdd)

If you are using Spark 2.0, then it seems there is no way to manually
modifying the source data because loading existing RDD or DataSet[String]
to DataFrame is not yet supported.

There is an issue open[2]. I hope this is helpful.

Thanks.

[1]
https://github.com/apache/spark/blob/27209252f09ff73c58e60c6df8aaba73b308088c/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala#L369
[2] https://issues.apache.org/jira/browse/SPARK-15463


​


On 10 Sep 2016 6:14 p.m., "Selvam Raman"  wrote:

> Hi,
>
> I am using spark csv to read csv file. The issue is my files first n lines
> contains some report and followed by actual data (header and rest of the
> data).
>
> So how can i skip first n lines in spark csv. I dont have any specific
> comment character in the first byte.
>
> Please give me some idea.
>
> --
> Selvam Raman
> "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"
>


Spark CSV skip lines

2016-09-10 Thread Selvam Raman
Hi,

I am using spark csv to read csv file. The issue is my files first n lines
contains some report and followed by actual data (header and rest of the
data).

So how can i skip first n lines in spark csv. I dont have any specific
comment character in the first byte.

Please give me some idea.

-- 
Selvam Raman
"லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"


Re: SparkSQL DAG generation , DAG optimization , DAG execution

2016-09-10 Thread Rabin Banerjee
Hi ,


   1. You are doing some analytics I guess?  *YES*
   2. It is almost impossible to guess what is happening except that you
   are looping 50 times over the same set of sql?  *I am Not Looping any
   SQL, All SQLs are called exactly once , which requires output from prev
   SQL.*
   3. Your sql step n depends on step n-1. So spark cannot get rid of 1
   - n steps, *TRUE, But If I have N SQL and all i th SQL is dependent
   upon i-1 , how spark optimize the memory ? is it like for each i th sql it
   will start execution from STAGE 0 *
   4. you are not storing anything in  memory(no cache, no persist), so all
   memory is used for the execution , IF Spark is not storing anything in
   memory , then when it is executing *i th sql it will start execution
   from STAGE 0 i.e starting from file read *
   5. What happens when you run it only once? How much memory is used (look
   at UI page, 4040 by default) , ? *I checked Spark UI DAG , so many file
   reads , Why ?*
   6.  What Spark mode is being used (Local, Standalone, Yarn) ? *Yarn*
   7. OOM could be anything depending on how much you are allocating to
   your driver memory in spark-submit ? *Driver and executor memory is set
   as 4gb , input data size is less than 1 GB, NO of executor is 5.*

*I am still bit confused about spark's execution plan on multiple SQL with
only one action .*

*Is it executing each SQL separately and trying to store intermediate
result in memory which is causing OOM/GC overhead ?*
*And Still my questions are ...*

*1. Will Spark optimize multiple SQL queries into one single plysical plan
Which will at least will not execute same stage twice , read file once... ?*
*2. In DAG I can see a lot of file read and lot of stages , Why ? I only
called action once ? Why in multiple stage Spark is again starting from
file reading ?*
*3. Is every SQL will execute and its intermediate result will be stored in
memory ?*
*4. What is something that causing OOM and GC overhead here ?*
*5. What is optimization that could be taken care of ? *


On Sat, Sep 10, 2016 at 11:35 AM, Mich Talebzadeh  wrote:

> Hi
>
>1. You are doing some analytics I guess?
>2. It is almost impossible to guess what is happening except that you
>are looping 50 times over the same set of sql?
>3. Your sql step n depends on step n-1. So spark cannot get rid of 1
>-n steps
>4. you are not storing anything in  memory(no cache, no persist), so
>all memory is used for the execution
>5. What happens when you run it only once? How much memory is used
>(look at UI page, 4040 by default)
>6.  What Spark mode is being used (Local, Standalone, Yarn)
>7. OOM could be anything depending on how much you are allocating to
>your driver memory in spark-submit
>
> HTH
>
>
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 10 September 2016 at 06:21, Rabin Banerjee <
> dev.rabin.baner...@gmail.com> wrote:
>
>> HI All,
>>
>>  I am writing and executing a Spark Batch program which only use
>> SPARK-SQL , But it is taking lot of time and finally giving GC overhead .
>>
>> Here is the program ,
>>
>> 1.Read 3 files ,one medium size and 2 small files, and register them as
>> DF.
>> 2.
>>  fire sql with complex aggregation and windowing .
>>  register result as DF.
>>
>> 3.  .Repeat step 2 almost 50 times .so 50 sql .
>>
>> 4. All SQLs are sequential , i.e next step requires prev step result .
>>
>> 5. Finally save the final DF .(This is the only action called).
>>
>> Note ::
>>
>> 1. I haven't persists the intermediate DF , as I think Spark will
>> optimize multiple SQL into one physical execution plan .
>> 2. Executor memory and Driver memory is set as 4gb which is too high as
>> data size is in MB.
>>
>> Questions ::
>>
>> 1. Will Spark optimize multiple SQL queries into one single plysical plan
>> ?
>> 2. In DAG I can see a lot of file read and lot of stages , Why ? I only
>> called action once ?
>> 3. Is every SQL will execute and its intermediate result will be stored
>> in memory ?
>> 4. What is something that causing OOM and GC overhead here ?
>> 5. What is optimization that could be taken care of ?
>>
>> Spark Version 1.5.x
>>
>>
>> Thanks in advance .
>> Rabin
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>


SparkR API problem with subsetting distributed data frame

2016-09-10 Thread Bene
Hi,

I am having a problem with the SparkR API. I need to subset a distributed
data so I can extract single values from it on which I can then do
calculations.

Each row of my df has two integer values, I am creating a vector of new
values calculated as a series of sin, cos, tan functions on these two
values. Does anyone have an idea how to do this in SparkR?

So far I tried subsetting with [], [[]], subset(), but mostly I get the
error

object of type 'S4' is not subsettable 

Is there any way to do such a thing in SparkR? Any help would be greatly
appreciated! Also let me know if you need more information, code etc.
Thanks!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/SparkR-API-problem-with-subsetting-distributed-data-frame-tp27688.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: spark streaming kafka connector questions

2016-09-10 Thread Cheng Yi
After some investigation, the problem i see is liked caused by a filter and
union of the dstream.
if i just do kafka-stream -- process -- output operator, then there is no
problem, one event will be fetched once.
if i do 
kafka-stream -- process(1) - filter a stream A for later union --|
   |_ filter a stream B  -- process(2)
-|_ A union B output process (3)
the event will be fetched 2 times, duplicate message start process at the
end of process(1), see following traces:

16/09/10 00:11:00 INFO CachedKafkaConsumer: Initial fetch for
spark-executor-testgid log-analysis-topic 2 1 *(fetch EVENT 1st time)*

16/09/10 00:11:00 INFO AbstractCoordinator: Discovered coordinator
192.168.2.6:9092 (id: 2147483647 rack: null) for group
spark-executor-testgid.

log of processing (1) for event 1

16/09/10 00:11:03 INFO Executor: Finished task 0.0 in stage 9.0 (TID 36).
1401 bytes result sent to driver

16/09/10 00:11:03 INFO TaskSetManager: Finished task 0.0 in stage 9.0 (TID
36) in 3494 ms on localhost (3/3)

16/09/10 00:11:03 INFO TaskSchedulerImpl: Removed TaskSet 9.0, whose tasks
have all completed, from pool 

16/09/10 00:11:03 INFO DAGScheduler: ShuffleMapStage 9 (flatMapToPair
(*processing (1)*) at SparkAppDriver.java:136) finished in 3.506 s

16/09/10 00:11:03 INFO DAGScheduler: looking for newly runnable stages

16/09/10 00:11:03 INFO DAGScheduler: running: Set()

16/09/10 00:11:03 INFO DAGScheduler: waiting: Set(ShuffleMapStage 10,
ResultStage 11)

16/09/10 00:11:03 INFO DAGScheduler: failed: Set()

16/09/10 00:11:03 INFO DAGScheduler: Submitting ShuffleMapStage 10
(UnionRDD[41] at union (*process (3)*) at SparkAppDriver.java:155), which
has no missing parents

16/09/10 00:11:03 INFO DAGScheduler: Submitting 6 missing tasks from
ShuffleMapStage 10 (UnionRDD[41] at union at SparkAppDriver.java:155)

16/09/10 00:11:03 INFO KafkaRDD: Computing topic log-analysis-topic,
partition 2 offsets 1 -> 2

16/09/10 00:11:03 INFO CachedKafkaConsumer: Initial fetch for
spark-executor-testgid log-analysis-topic 2 1 ( *(fetch the same EVENT 2nd
time)*)

16/09/10 00:11:10 INFO JobScheduler: Finished job streaming job
147349146 ms.0 from job set of time 147349146 ms

16/09/10 00:11:10 INFO JobScheduler: Total delay: 10.920 s for time
147349146 ms (execution: 10.874 s)* (EVENT 1st time process cost 10.874
s)*

16/09/10 00:11:10 INFO JobScheduler: Finished job streaming job
1473491465000 ms.0 from job set of time 1473491465000 ms

16/09/10 00:11:10 INFO JobScheduler: Total delay: 5.986 s for time
1473491465000 ms (execution: 0.066 s) *(EVENT 2nd time process cost 0.066)*

and the 2nd time processing of the event finished without really doing the
work.

Help is hugely appreciated.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/spark-streaming-kafka-connector-questions-tp27681p27687.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: spark streaming kafka connector questions

2016-09-10 Thread 毅程
Cody, Thanks for the message.

1. as you mentioned, I do find the version for kafka 0.10.1, I will use
that, although lots of experimental tags. Thank you.
2. I have done thorough investigating, it is NOT the scenario where 1st
process failed, then 2nd process triggered.
3. I do agree the session timeout, auto commit are not the root cause here.
4. the problem i see is liked caused by a filter and union of the dstream
(I will try to elaborate in another question post)
if i just do kafka-stream -- process -- output operator, then there is no
problem
if i do
kafka-stream -- process(1) - filter a stream A for later union --|
   |_ filter a stream B  -- process(2)
-|_ A union B output process (3)
the duplication message start process at the end of process(1), see
following traces:

16/09/10 00:11:00 INFO CachedKafkaConsumer: Initial fetch for
spark-executor-testgid log-analysis-topic 2 1* (fetch EVENT 1st time)*

16/09/10 00:11:00 INFO AbstractCoordinator: Discovered coordinator
192.168.2.6:9092 (id: 2147483647 rack: null) for group
spark-executor-testgid.

log of processing (1) for event 1

16/09/10 00:11:03 INFO Executor: Finished task 0.0 in stage 9.0 (TID 36).
1401 bytes result sent to driver

16/09/10 00:11:03 INFO TaskSetManager: Finished task 0.0 in stage 9.0 (TID
36) in 3494 ms on localhost (3/3)

16/09/10 00:11:03 INFO TaskSchedulerImpl: Removed TaskSet 9.0, whose tasks
have all completed, from pool

16/09/10 00:11:03 INFO DAGScheduler: ShuffleMapStage 9 (flatMapToPair
(*processing
(1)*) at SparkAppDriver.java:136) finished in 3.506 s

16/09/10 00:11:03 INFO DAGScheduler: looking for newly runnable stages

16/09/10 00:11:03 INFO DAGScheduler: running: Set()

16/09/10 00:11:03 INFO DAGScheduler: waiting: Set(ShuffleMapStage 10,
ResultStage 11)

16/09/10 00:11:03 INFO DAGScheduler: failed: Set()

16/09/10 00:11:03 INFO DAGScheduler: Submitting ShuffleMapStage 10
(UnionRDD[41] at union (*process (3)*) at SparkAppDriver.java:155), which
has no missing parents

16/09/10 00:11:03 INFO DAGScheduler: Submitting 6 missing tasks from
ShuffleMapStage 10 (UnionRDD[41] at union at SparkAppDriver.java:155)

16/09/10 00:11:03 INFO KafkaRDD: Computing topic log-analysis-topic,
partition 2 offsets 1 -> 2

16/09/10 00:11:03 INFO CachedKafkaConsumer: Initial fetch for
spark-executor-testgid log-analysis-topic 2 1 (* (fetch the same EVENT 2nd
time)*)

16/09/10 00:11:10 INFO JobScheduler: Finished job streaming job
147349146 ms.0 from job set of time 147349146 ms

16/09/10 00:11:10 INFO JobScheduler: Total delay: 10.920 s for time
147349146 ms (execution: 10.874 s) (EVENT 1st time process cost 10.874
s)

16/09/10 00:11:10 INFO JobScheduler: Finished job streaming job
1473491465000 ms.0 from job set of time 1473491465000 ms

16/09/10 00:11:10 INFO JobScheduler: Total delay: 5.986 s for time
1473491465000 ms (execution: 0.066 s) (EVENT 2nd time process cost 0.066)

and the 2nd time processing of the event finished without really doing the
work.

2016-09-08 14:55 GMT-07:00 Cody Koeninger :

> - If you're seeing repeated attempts to process the same message, you
> should be able to look in the UI or logs and see that a task has
> failed.  Figure out why that task failed before chasing other things
>
> - You're not using the latest version, the latest version is for spark
> 2.0.  There are two versions of the connector for spark 2.0, one for
> kafka 0.8 or higher, and one for kafka 0.10 or higher
>
> - Committing individual messages to kafka doesn't make any sense,
> spark streaming deals with batches.  If you're doing any aggregations
> that involve shuffling, there isn't even a guarantee that you'll
> process messages in order for a given topicpartition
>
> - Auto commit has no effect for the 0.8 version of createDirectStream.
> Turning it on for the 0.10 version of createDirectStream is a really
> bad idea, it will give you undefined delivery semantics, because the
> commit to Kafka is unrelated to whether the batch processed
> successfully
>
> If you're unclear on how the kafka integration works, see
>
> https://github.com/koeninger/kafka-exactly-once
>
> On Thu, Sep 8, 2016 at 4:44 PM, Cheng Yi  wrote:
> > I am using the lastest streaming kafka connector
> > org.apache.spark
> > spark-streaming-kafka_2.11
> > 1.6.2
> >
> > I am facing the problem that a message is delivered two times to my
> > consumers. these two deliveries are 10+ seconds apart, it looks this is
> > caused by my lengthy message processing (took about 60 seconds), then I
> > tried to solve this, but I am still stuck.
> >
> > 1. looks the kafka streaming connector supports kafka v0.8 and maybe v0.9
> > but not v.10
> >
> > JavaPairInputDStream ds = KafkaUtils.createDirectStream(
> jsc,
> > String.class, String.class,
> StringDecoder.class, StringDecoder.class,
> > kafkaParams, topicsSet);
> >
> > 2. 

Re: pyspark persist MEMORY_ONLY vs MEMORY_AND_DISK

2016-09-10 Thread Josh Rosen
Based on Ben's helpful error description, I managed to reproduce this bug
and found the root cause:

There's a bug in MemoryStore's PartiallySerializedBlock class: it doesn't
close a serialization stream before attempting to deserialize its
serialized values, causing it to miss any data stored in the serializer's
internal buffers (which can happen with KryoSerializer, which was
automatically being used to serialize RDDs of byte arrays). I've reported
this as https://issues.apache.org/jira/browse/SPARK-17491 and have submitted
 https://github.com/apache/spark/pull/15043 to fix this (I'm still planning
to add more tests to that patch).

On Fri, Sep 9, 2016 at 10:37 AM Josh Rosen  wrote:

> cache() / persist() is definitely *not* supposed to affect the result of
> a program, so the behavior that you're seeing is unexpected.
>
> I'll try to reproduce this myself by caching in PySpark under heavy memory
> pressure, but in the meantime the following questions will help me to debug:
>
>- Does this only happen in Spark 2.0? Have you successfully run the
>same workload with correct behavior on an earlier Spark version, such as
>1.6.x?
>- How accurately does your example code model the structure of your
>real code? Are you calling cache()/persist() on an RDD which has been
>transformed in Python or are you calling it on an untransformed input RDD
>(such as the RDD returned from sc.textFile() / sc.hadoopFile())?
>
>
> On Fri, Sep 9, 2016 at 5:01 AM Ben Leslie  wrote:
>
>> Hi,
>>
>> I'm trying to understand if there is any difference in correctness
>> between rdd.persist(pyspark.StorageLevel.MEMORY_ONLY) and
>> rdd.persist(pyspark.StorageLevel.MEMORY_AND_DISK).
>>
>> I can see that there may be differences in performance, but my
>> expectation was that using either would result in the same behaviour.
>> However that is not what I'm seeing in practise.
>>
>> Specifically I have some code like:
>>
>> text_lines = sc.textFile(input_files)
>> records = text_lines.map(json.loads)
>> records.persist(pyspark.StorageLevel.MEMORY_ONLY)
>> count = records.count()
>> records.unpersist()
>>
>> When I do not use persist at all the 'count' variable contains the
>> correct value.
>> When I use persist with pyspark.StorageLevel.MEMORY_AND_DISK, I also
>> get the correct, expected value.
>> However, if I use persist with no argument (or
>> pyspark.StorageLevel.MEMORY_ONLY) then the value of 'count' is too
>> small.
>>
>> In all cases the script completes without errors (or warning as far as
>> I can tell).
>>
>> I'm using Spark 2.0.0 on an AWS EMR cluster.
>>
>> It appears that the executors may not have enough memory to store all
>> the RDD partitions in memory only, however I thought in this case it
>> would fall back to regenerating from the parent RDD, rather than
>> providing the wrong answer.
>>
>> Is this the expected behaviour? It seems a little difficult to work
>> with in practise.
>>
>> Cheers,
>>
>> Ben
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>


Re: SparkSQL DAG generation , DAG optimization , DAG execution

2016-09-10 Thread Mich Talebzadeh
Hi

   1. You are doing some analytics I guess?
   2. It is almost impossible to guess what is happening except that you
   are looping 50 times over the same set of sql?
   3. Your sql step n depends on step n-1. So spark cannot get rid of 1 -n
   steps
   4. you are not storing anything in  memory(no cache, no persist), so all
   memory is used for the execution
   5. What happens when you run it only once? How much memory is used (look
   at UI page, 4040 by default)
   6.  What Spark mode is being used (Local, Standalone, Yarn)
   7. OOM could be anything depending on how much you are allocating to
   your driver memory in spark-submit

HTH





Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 10 September 2016 at 06:21, Rabin Banerjee 
wrote:

> HI All,
>
>  I am writing and executing a Spark Batch program which only use SPARK-SQL
> , But it is taking lot of time and finally giving GC overhead .
>
> Here is the program ,
>
> 1.Read 3 files ,one medium size and 2 small files, and register them as DF.
> 2.
>  fire sql with complex aggregation and windowing .
>  register result as DF.
>
> 3.  .Repeat step 2 almost 50 times .so 50 sql .
>
> 4. All SQLs are sequential , i.e next step requires prev step result .
>
> 5. Finally save the final DF .(This is the only action called).
>
> Note ::
>
> 1. I haven't persists the intermediate DF , as I think Spark will optimize
> multiple SQL into one physical execution plan .
> 2. Executor memory and Driver memory is set as 4gb which is too high as
> data size is in MB.
>
> Questions ::
>
> 1. Will Spark optimize multiple SQL queries into one single plysical plan ?
> 2. In DAG I can see a lot of file read and lot of stages , Why ? I only
> called action once ?
> 3. Is every SQL will execute and its intermediate result will be stored in
> memory ?
> 4. What is something that causing OOM and GC overhead here ?
> 5. What is optimization that could be taken care of ?
>
> Spark Version 1.5.x
>
>
> Thanks in advance .
> Rabin
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>