Re: Behind the scene of RDD to DataFrame

2016-02-20 Thread Hemant Bhanawat
toDF internally calls sqlcontext.createDataFrame which transforms the RDD
to RDD[InternalRow]. This RDD[InternalRow] is then mapped to a dataframe.

Type conversions (from scala types to catalyst types) are involved but no
shuffling.

Hemant Bhanawat 
www.snappydata.io

On Sun, Feb 21, 2016 at 11:48 AM, Weiwei Zhang 
wrote:

> Hi there,
>
> Could someone explain to me what is behind the scene of rdd.toDF()? More
> importantly, will this step involve a lot of shuffles and cause the surge
> of the size of intermediate files? Thank you.
>
> Best Regards,
> Vivian
>


Behind the scene of RDD to DataFrame

2016-02-20 Thread Weiwei Zhang
Hi there,

Could someone explain to me what is behind the scene of rdd.toDF()? More
importantly, will this step involve a lot of shuffles and cause the surge
of the size of intermediate files? Thank you.

Best Regards,
Vivian


Re: Element appear in both 2 splits of RDD after using randomSplit

2016-02-20 Thread Ted Yu
Have you looked at:
SPARK-12662 Fix DataFrame.randomSplit to avoid creating overlapping splits

Cheers

On Sat, Feb 20, 2016 at 7:01 PM, tuan3w  wrote:

> I'm training a model using MLLib. When I try to split data into training
> and
> test data, I found a weird problem. I can't figure what problem is
> happening
> here.
>
> Here is my code in experiment:
>
> val  logData = rdd.map(x => (x._1, x._2)).distinct()
> val ratings: RDD[Rating] = logData.map(x => Rating(x._1, x._2, 1))
> val userProducts = ratings.map(x => (x.user, x.product))
> val splits = userProducts.randomSplit(Array(0.7, 0.3))
> val train = splits(0)
> train.count() // 1660895
> val test = splits(1)
> test.count() // 712306
> // test if an element appear in both splits
> train.map(x => (x._1 + "_" + x._2, 1)).join(test.map(x => (x._1 + "_" +
> x._2, 2))).take(5)
> //return res153: Array[(String, (Int, Int))] = Array((1172491_2899,(1,2)),
> (1206777_1567,(1,2)), (91828_571,(1,2)), (329210_2435,(1,2)),
> (24356_135,(1,2)))
>
> If I try to save to hdfs and load RDD from HDFS this problem doesn't
> happen.
>
> userProducts.map(x => x._1 + ":" +
> x._2).saveAsTextFile("/user/tuannd/test2.txt")
> val userProducts = sc.textFile("/user/tuannd/test2.txt").map(x => {
> val d =x.split(":")
> (d(0).toInt(), d(1).toInt())
> })
> // other steps are as same as above
>
> I'm using spark 1.5.2.
> Thanks for all your help.
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Element-appear-in-both-2-splits-of-RDD-after-using-randomSplit-tp26281.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: how to set database in DataFrame.saveAsTable?

2016-02-20 Thread Glen
Any example code?

In pyspark:
sqlContex.sql("use mytable")
my_df.saveAsTable("tmp_spark_debug", mode="overwrite")

1. The code above seems not register the table in hive. I have to create
table from hdfs in hive, it reports some format error: rcformat and parquet.
2. Rerun the saveAsTable using  mode="overwrite" in saveAsTable, it reports
the table already exists.
3. Sometimes it creates a directory in  hive/warehouse/tmp_spark_debug, not
in hive/warehouse/mytable/tmp_spark_debug.


My goal is simple:
df.saveAsTable('blablabla')  // create a hive table in some database, then
it can be visited by hive.

I tried lots of time, it seems there are lots of bug in pyspark. Or my
mehtod is wrong?

2016-02-21 10:04 GMT+08:00 gen tang :

> Hi,
>
> You can use
> sqlContext.sql("use ")
> before use dataframe.saveAsTable
>
> Hope it could be helpful
>
> Cheers
> Gen
>
>
> On Sun, Feb 21, 2016 at 9:55 AM, Glen  wrote:
>
>> For dataframe in spark, so the table can be visited by hive.
>>
>> --
>> Jacky Wang
>>
>
>


-- 
Jacky Wang


Element appear in both 2 splits of RDD after using randomSplit

2016-02-20 Thread tuan3w
I'm training a model using MLLib. When I try to split data into training and
test data, I found a weird problem. I can't figure what problem is happening
here.

Here is my code in experiment: 

val  logData = rdd.map(x => (x._1, x._2)).distinct()
val ratings: RDD[Rating] = logData.map(x => Rating(x._1, x._2, 1))
val userProducts = ratings.map(x => (x.user, x.product))
val splits = userProducts.randomSplit(Array(0.7, 0.3))
val train = splits(0)
train.count() // 1660895
val test = splits(1)
test.count() // 712306
// test if an element appear in both splits
train.map(x => (x._1 + "_" + x._2, 1)).join(test.map(x => (x._1 + "_" +
x._2, 2))).take(5)
//return res153: Array[(String, (Int, Int))] = Array((1172491_2899,(1,2)),
(1206777_1567,(1,2)), (91828_571,(1,2)), (329210_2435,(1,2)),
(24356_135,(1,2)))

If I try to save to hdfs and load RDD from HDFS this problem doesn't happen.

userProducts.map(x => x._1 + ":" +
x._2).saveAsTextFile("/user/tuannd/test2.txt")
val userProducts = sc.textFile("/user/tuannd/test2.txt").map(x => {
val d =x.split(":")
(d(0).toInt(), d(1).toInt())
})
// other steps are as same as above

I'm using spark 1.5.2.
Thanks for all your help.




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Element-appear-in-both-2-splits-of-RDD-after-using-randomSplit-tp26281.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: how to set database in DataFrame.saveAsTable?

2016-02-20 Thread gen tang
Hi,

You can use
sqlContext.sql("use ")
before use dataframe.saveAsTable

Hope it could be helpful

Cheers
Gen


On Sun, Feb 21, 2016 at 9:55 AM, Glen  wrote:

> For dataframe in spark, so the table can be visited by hive.
>
> --
> Jacky Wang
>


how to set database in DataFrame.saveAsTable?

2016-02-20 Thread Glen
For dataframe in spark, so the table can be visited by hive.

-- 
Jacky Wang


Re: Fair scheduler pool details

2016-02-20 Thread Mark Hamstra
It's 2 -- and it's pretty hard to point to a line of code, a method, or
even a class since the scheduling of Tasks involves a pretty complex
interaction of several Spark components -- mostly the DAGScheduler,
TaskScheduler/TaskSchedulerImpl, TaskSetManager, Schedulable and Pool, as
well as the SchedulerBackend (CoarseGrainedSchedulerBackend in this case.)
 The key thing to understand, though, is the comment at the top of
SchedulerBackend.scala: "A backend interface for scheduling systems that
allows plugging in different ones under TaskSchedulerImpl. We assume a
Mesos-like model where the application gets resource offers as machines
become available and can launch tasks on them."  In other words, the whole
scheduling system is built on a model that starts with offers made by
workers when resources are available to run Tasks.  Other than the big
hammer of canceling a Job while interruptOnCancel is true, there isn't
really any facility for stopping or rescheduling Tasks that are already
started, so that rules out your option 1.  Similarly, option 3 is out
because the scheduler doesn't know when Tasks will complete; it just knows
when a new offer comes in and it is time to send more Tasks to be run on
the machine making the offer.

What actually happens is that the Pool with which a Job is associated
maintains a queue of TaskSets needing to be scheduled.  When in
resourceOffers the TaskSchedulerImpl needs sortedTaskSets, the Pool
supplies those from its scheduling queue after first sorting it according
to the Pool's taskSetSchedulingAlgorithm.  In other words, what Spark's
fair scheduling does in essence is, in response to worker resource offers,
to send new Tasks to be run; those Tasks are taken in sets from the queue
of waiting TaskSets, sorted according to a scheduling algorithm.  There is
no pre-emption or rescheduling of Tasks that the scheduler has already sent
to the workers, nor is there any attempt to anticipate when already running
Tasks will complete.


On Sat, Feb 20, 2016 at 4:14 PM, Eugene Morozov 
wrote:

> Hi,
>
> I'm trying to understand how this thing works underneath. Let's say I have
> two types of jobs - high important, that might use small amount of cores
> and has to be run pretty fast. And less important, but greedy - uses as
> many cores as available. So, the idea is to use two corresponding pools.
>
> Then thing I'm trying to understand is the following.
> I use standalone spark deployment (no YARN, no Mesos).
> Let's say that less important took all the cores and then someone runs
> high important job. Then I see three possibilities:
> 1. Spark kill some executors that currently runs less important partitions
> to assign them to a high performant job.
> 2. Spark will wait until some partitions of less important job will be
> completely processed and then first executors that become free will be
> assigned to process high important job.
> 3. Spark will figure out specific time, when particular stages of
> partitions of less important jobs is done, and instead of continue with
> this job, these executors will be reassigned to high important one.
>
> Which one it is? Could you please point me to a class / method / line of
> code?
> --
> Be well!
> Jean Morozov
>


RE: Spark JDBC connection - data writing success or failure cases

2016-02-20 Thread Mich Talebzadeh
Ok as I understand you mean pushing data from Spark to Oracle database via 
JDBC? Correct

 

There are a number of ways to do so.

 

Most common way is using Sqoop to get the data from HDFS file or Hive table 
Oracle database. With Spark you can also use that method by storing the data in 
Hive table and using Sqoop to do the job or directly making reference to where 
the data is stored

 

Sqoop uses JDBC for this work and I believe it delivers data in batch 
transactions configurable with “export.statements.per.transaction”. Have a look 
at sqoop export –help.

 

With batch transactions depending on the size of the batch you may have a 
partial delivery of data in case of some issues such network failure or running 
out of space on the Oracle schema.

 

It really boils down to the volume of the data and the way this is going to 
happen say you job runs as a cron and you may do parallel processing with 
multiple connections to Oracle database.

 

In general it should work and much like what we see loading data from HFDS file 
to Oracle it should follow JDBC protocols.

 

One interesting concept that I would like to try is loading data from RDD -> DF 
-> temporary table in Spark – pushing data to Oracle DB via JDBC. I have done 
this other way round no problem.

 

Like any load you are effectively doing an ETL from Spark to Oracle and you are 
better off loading data to Oracle staging table first and once all gone through 
and you have checked the job, push data from the staging table to main table in 
Oracle to reduce the risk of failure.

 

HTH

 

 

Dr Mich Talebzadeh

 

LinkedIn  
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com  

 

NOTE: The information in this email is proprietary and confidential. This 
message is for the designated recipient only, if you are not the intended 
recipient, you should destroy it immediately. Any information in this message 
shall not be understood as given or endorsed by Peridale Technology Ltd, its 
subsidiaries or their employees, unless expressly so stated. It is the 
responsibility of the recipient to ensure that this email is virus free, 
therefore neither Peridale Technology Ltd, its subsidiaries nor their employees 
accept any responsibility.

 

 

From: Divya Gehlot [mailto:divya.htco...@gmail.com] 
Sent: 21 February 2016 00:09
To: Mich Talebzadeh 
Cc: user @spark ; Russell Jurney 
; Jörn Franke 
Subject: RE: Spark JDBC connection - data writing success or failure cases

 

Thanks for the input everyone .
What I am trying to understand is if I use oracle to store my data after Spark 
job processing.
And if any spark job fails half the way.
What happens then..
Does rollback happens or we have to programatically  handle this kind of 
situation in spark job itself?
How transaction are being handled n spark to oracle storage ?
My apologies for such a naive question .
Thanks,
Divya 

agreed

 

Dr Mich Talebzadeh

 

LinkedIn  
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com  

 

NOTE: The information in this email is proprietary and confidential. This 
message is for the designated recipient only, if you are not the intended 
recipient, you should destroy it immediately. Any information in this message 
shall not be understood as given or endorsed by Peridale Technology Ltd, its 
subsidiaries or their employees, unless expressly so stated. It is the 
responsibility of the recipient to ensure that this email is virus free, 
therefore neither Peridale Technology Ltd, its subsidiaries nor their employees 
accept any responsibility.

 

 

From: Russell Jurney [mailto:russell.jur...@gmail.com 
 ] 
Sent: 19 February 2016 16:49
To: Jörn Franke  >
Cc: Divya Gehlot  >; 
user @spark  >
Subject: Re: Spark JDBC connection - data writing success or failure cases

 

Oracle is a perfectly reasonable endpoint for publishing data processed in 
Spark. I've got to assume he's using it that way and not as a stand in for HDFS?

On Friday, February 19, 2016, Jörn Franke  > wrote:

Generally oracle db should not be used as a storage layer for spark due to 
performance reasons. You should consider HDFS. This will help you also with 
fault - tolerance.

> On 19 Feb 2016, at 03:35, Divya Gehlot   > wrote:
>
> Hi,
> I am a Spark job which connects to RDBMS (in mycase its Oracle).
> How can we check that complete data writing is successful?
> Can I use commit in case of 

Fair scheduler pool details

2016-02-20 Thread Eugene Morozov
Hi,

I'm trying to understand how this thing works underneath. Let's say I have
two types of jobs - high important, that might use small amount of cores
and has to be run pretty fast. And less important, but greedy - uses as
many cores as available. So, the idea is to use two corresponding pools.

Then thing I'm trying to understand is the following.
I use standalone spark deployment (no YARN, no Mesos).
Let's say that less important took all the cores and then someone runs high
important job. Then I see three possibilities:
1. Spark kill some executors that currently runs less important partitions
to assign them to a high performant job.
2. Spark will wait until some partitions of less important job will be
completely processed and then first executors that become free will be
assigned to process high important job.
3. Spark will figure out specific time, when particular stages of
partitions of less important jobs is done, and instead of continue with
this job, these executors will be reassigned to high important one.

Which one it is? Could you please point me to a class / method / line of
code?
--
Be well!
Jean Morozov


Re: Spark Job Hanging on Join

2016-02-20 Thread Dave Moyers
Try this setting in your Spark defaults:

spark.sql.autoBroadcastJoinThreshold=-1

I had a similar problem with joins hanging and that resolved it for me. 

You might be able to pass that value from the driver as a --conf option, but I 
have not tried that, and not sure if that will work. 

Sent from my iPad

> On Feb 19, 2016, at 11:31 AM, Tamara Mendt  wrote:
> 
> Hi all, 
> 
> I am running a Spark job that gets stuck attempting to join two dataframes. 
> The dataframes are not very large, one is about 2 M rows, and the other a 
> couple of thousand rows and the resulting joined dataframe should be about 
> the same size as the smaller dataframe. I have tried triggering execution of 
> the join using the 'first' operator, which as far as I understand would not 
> require processing the entire resulting dataframe (maybe I am mistaken 
> though). The Spark UI is not telling me anything, just showing the task to be 
> stuck.
> 
> When I run the exact same job on a slightly smaller dataset it works without 
> hanging.
> 
> I have used the same environment to run joins on much larger dataframes, so I 
> am confused as to why in this particular case my Spark job is just hanging. I 
> have also tried running the same join operation using pyspark on two 2 
> Million row dataframes (exactly like the one I am trying to join in the job 
> that gets stuck) and it runs succesfully.
> 
> I have tried caching the joined dataframe to see how much memory it is 
> requiring but the job gets stuck on this action too. I have also tried using 
> persist to memory and disk on the join, and the job seems to be stuck all the 
> same. 
> 
> Any help as to where to look for the source of the problem would be much 
> appreciated.
> 
> Cheers,
> 
> Tamara
> 

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Checking for null values when mapping

2016-02-20 Thread Mich Talebzadeh
Ok it turned out a bit less complicated than I thought :).  I would be 
interested if creating temporary table from DF and pushing data into Hive the 
best option here?

 

1.Prepare and clean up data with filter & map

2.Convert the RDD to DF

3.Create temporary table from DF

4.Use Hive database

5.Drop if exists and create ORC table in Hive database

6.Simple Insert/select from temporary table to Hive table

 

//

// Get a DF first based on Databricks CSV libraries

//

val df = 
sqlContext.read.format("com.databricks.spark.csv").option("inferSchema", 
"true").option("header", "true").load("/data/stg/table2")

//

// Next filter out empty rows (last colum has to be > "" and get rid of "?" 
special character. Also get rid of "," in money fields

// Example csv cell £,500.00 --> need to transform to plain 2500.00

//

val a = df.filter(col("Total") > "").map(x => (x.getString(0),x.getString(1), 
x.getString(2).substring(1).replace(",", "").toDouble, 
x.getString(3).substring(1).replace(",", "").toDouble, 
x.getString(4).substring(1).replace(",", "").toDouble))

 

a.first

//

// convert this RDD to DF and create a Spark temporary table

//

a.toDF.registerTempTable("tmp")

//

// Need to create and populate target ORC table t3 in database test in Hive

//

sql("use test")

//

// Drop and create table t3

//

sql("DROP TABLE IF EXISTS t3")

var sqltext : String = ""

sqltext = """

CREATE TABLE t3 (

INVOICENUMBER  INT

,PAYMENTDATEtimestamp

,NETDECIMAL(20,2)

,VATDECIMAL(20,2)

,TOTAL  DECIMAL(20,2)

)

COMMENT 'from csv file from excel sheet'

STORED AS ORC

TBLPROPERTIES ( "orc.compress"="ZLIB" )

"""

sql(sqltext)

//

// Put data in Hive table. Clean up is already done

//

sqltext = "INSERT INTO t3 SELECT * FROM tmp"

sql(sqltext)

sql("SELECT * FROM t3 ORDER BY 1").collect.foreach(println)

sys.exit()

 

Dr Mich Talebzadeh

 

LinkedIn  
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com  

 

NOTE: The information in this email is proprietary and confidential. This 
message is for the designated recipient only, if you are not the intended 
recipient, you should destroy it immediately. Any information in this message 
shall not be understood as given or endorsed by Peridale Technology Ltd, its 
subsidiaries or their employees, unless expressly so stated. It is the 
responsibility of the recipient to ensure that this email is virus free, 
therefore neither Peridale Technology Ltd, its subsidiaries nor their employees 
accept any responsibility.

 

 

From: Ted Yu [mailto:yuzhih...@gmail.com] 
Sent: 20 February 2016 12:33
To: Mich Talebzadeh 
Cc: Michał Zieliński ; user @spark 

Subject: Re: Checking for null values when mapping

 

For #2, you can filter out row whose first column has length 0. 

 

Cheers


On Feb 20, 2016, at 6:59 AM, Mich Talebzadeh  > wrote:

Thanks

 

 

So what I did was

 

scala> val df = 
sqlContext.read.format("com.databricks.spark.csv").option("inferSchema", 
"true").option("header", "true").load("/data/stg/table2")

df: org.apache.spark.sql.DataFrame = [Invoice Number: string, Payment date: 
string, Net: string, VAT: string, Total: string]

 

 

scala> df.printSchema

root

|-- Invoice Number: string (nullable = true)

|-- Payment date: string (nullable = true)

|-- Net: string (nullable = true)

|-- VAT: string (nullable = true)

|-- Total: string (nullable = true)

 

 

So all the columns are Strings

 

Then I tried to exclude null rows by filtering on all columns not being null 
and map the rest

 

scala> val a = df.filter(col("Invoice Number").isNotNull and col("Payment 
date").isNotNull and col("Net").isNotNull and col("VAT").isNotNull and 
col("Total").isNotNull).map(x => 
(x.getString(1),x.getString(2).substring(1).replace(",", 
"").toDouble,x.getString(3).substring(1).replace(",", "").toDouble, 
x.getString(4).substring(1).replace(",", "").toDouble))

 

a: org.apache.spark.rdd.RDD[(String, Double, Double, Double)] = 
MapPartitionsRDD[176] at map at :21

 

This still comes up with “String index out of range: “ error

 

16/02/20 11:50:51 ERROR Executor: Exception in task 0.0 in stage 12.0 (TID 18)

java.lang.StringIndexOutOfBoundsException: String index out of range: -1

 

My questions are:

 

1.  Doing the map,  map(x => (x.getString(1)  -- Can I replace 
x.getString(1) with the actual column name say “Invoice Number” and so forth 
for other columns as well?

2.  Sounds like it crashes because of these columns below at the end

[421,02/10/2015,?1,187.50,?237.50,?1,425.00]  \\ example good one

[] \\ bad one, empty one

[Net income,,?182,531.25,?14,606.25,?197,137.50] 

[] \\ bad one, empty one

[year 

Re: JDBC based access to RDD

2016-02-20 Thread Shyam Sarkar
JdbcRDD.scala code is under the source code directory
 spark/core/src/main/scala/org/apache/spark/rdd

Thanks.

On Sat, Feb 20, 2016 at 11:51 AM, Shyam Sarkar  wrote:

> I was going through the scala code implementing RDD in Spark 1.6 source
> code and I found JdbcRDD functions defined for JDBC
> based access to RDD data.  I want to see some example code how to use such
> functions.
>
> Thanks,
>
>
>
>
> On Thu, Feb 18, 2016 at 2:43 PM, Mich Talebzadeh 
> wrote:
>
>> Can you please clarify your point
>>
>>
>>
>> Do you mean using JDBC to get data from other databases into Spark
>>
>>
>>
>>
>>
>> val s = HiveContext.load("jdbc",
>>
>> Map("url" -> _ORACLEserver,
>>
>> "dbtable" -> "table”,
>>
>> "user" -> _username,
>>
>> "password" -> _password))
>>
>>
>>
>> HTH
>>
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>>
>> NOTE: The information in this email is proprietary and confidential. This
>> message is for the designated recipient only, if you are not the intended
>> recipient, you should destroy it immediately. Any information in this
>> message shall not be understood as given or endorsed by Peridale Technology
>> Ltd, its subsidiaries or their employees, unless expressly so stated. It is
>> the responsibility of the recipient to ensure that this email is virus
>> free, therefore neither Peridale Technology Ltd, its subsidiaries nor their
>> employees accept any responsibility.
>>
>>
>>
>>
>>
>> *From:* Shyam Sarkar [mailto:ssarkarayushnet...@gmail.com]
>> *Sent:* 18 February 2016 22:36
>> *To:* user@spark.apache.org
>> *Subject:* JDBC based access to RDD
>>
>>
>>
>> Is there any good code example for JDBC based access to RDD ?
>>
>> Thanks.
>>
>
>


Re: Spark Streaming: Is it possible to schedule multiple active batches?

2016-02-20 Thread Neelesh
spark.streaming.concurrentJobs may help. Experimental according to TD from
an older thread here
http://stackoverflow.com/questions/23528006/how-jobs-are-assigned-to-executors-in-spark-streaming

On Sat, Feb 20, 2016 at 11:24 AM, Jorge Rodriguez 
wrote:

>
> Is it possible to have the scheduler schedule the next batch even if the
> previous batch has not completed yet?  I'd like to schedule up to 3 batches
> at the same time if this is possible.
>


Spark Streaming: Is it possible to schedule multiple active batches?

2016-02-20 Thread Jorge Rodriguez
Is it possible to have the scheduler schedule the next batch even if the
previous batch has not completed yet?  I'd like to schedule up to 3 batches
at the same time if this is possible.


Re: Access to broadcasted variable

2016-02-20 Thread Ilya Ganelin
It gets serialized once per physical container, Instead of being serialized
once per task of every stage that uses it.
On Sat, Feb 20, 2016 at 8:15 AM jeff saremi  wrote:

> Is the broadcasted variable distributed to every executor or every worker?
> Now i'm more confused
> I thought it was supposed to save memory by distributing it to every
> worker and the executors would share that copy
>
>
> --
> Date: Fri, 19 Feb 2016 16:48:59 -0800
> Subject: Re: Access to broadcasted variable
> From: shixi...@databricks.com
> To: jeffsar...@hotmail.com
> CC: user@spark.apache.org
>
>
> The broadcasted object is serialized in driver and sent to the executors.
> And in the executor, it will deserialize the bytes to get the broadcasted
> object.
>
> On Fri, Feb 19, 2016 at 5:54 AM, jeff saremi 
> wrote:
>
> could someone please comment on this? thanks
>
> --
> From: jeffsar...@hotmail.com
> To: user@spark.apache.org
> Subject: Access to broadcasted variable
> Date: Thu, 18 Feb 2016 14:44:07 -0500
>
>
>
> I'd like to know if the broadcasted object gets serialized when accessed
> by the executor during the execution of a task?
> I know that it gets serialized from the driver to the worker. This
> question is inside worker when executor JVM's are accessing it
>
> thanks
> Jeff
>
>
>


Re: Spark execuotr Memory profiling

2016-02-20 Thread Nirav Patel
Thanks Nilesh. I don't think there;s heavy communication between driver and
executor. However I'll try the settings you suggested.

I can not replace groupBy with reduceBy as its not an associative
operation.

It is very frustrating to be honest. It was a piece of cake with map reduce
compare to amount to time I am putting in tuning spark make things work. To
remove doubt that executor might be running multiple tasks (executor.cores)
and hence reduce to share memory, I set executor.cores to 1 so only 1 task
have all the 15gb to its disposal!. Which is already 3 times it needs for
most skewed key. I am going to need to profile for sure to understand what
spark executors are doing there. For sure they are not willing to explain
the situation but rather will say 'use reduceBy'





On Thu, Feb 11, 2016 at 9:42 AM, Kuchekar  wrote:

> Hi Nirav,
>
>   I faced similar issue with Yarn, EMR 1.5.2 and following
> Spark Conf helped me. You can set the values accordingly
>
> conf= (SparkConf().set("spark.master","yarn-client").setAppName("HalfWay"
> ).set("spark.driver.memory", "15G").set("spark.yarn.am.memory","15G"))
>
> conf=conf.set("spark.driver.maxResultSize","10G").set(
> "spark.storage.memoryFraction","0.6").set("spark.shuffle.memoryFraction",
> "0.6").set("spark.yarn.executor.memoryOverhead","4000")
>
> conf = conf.set("spark.executor.cores","4").set("spark.executor.memory",
> "15G").set("spark.executor.instances","6")
>
> Is it also possible to use reduceBy in place of groupBy that might help
> the shuffling too.
>
>
> Kuchekar, Nilesh
>
> On Wed, Feb 10, 2016 at 8:09 PM, Nirav Patel 
> wrote:
>
>> We have been trying to solve memory issue with a spark job that processes
>> 150GB of data (on disk). It does a groupBy operation; some of the executor
>> will receive somehwere around (2-4M scala case objects) to work with. We
>> are using following spark config:
>>
>> "executorInstances": "15",
>>
>>  "executorCores": "1", (we reduce it to one so single task gets all
>> the executorMemory! at least that's the assumption here)
>>
>>  "executorMemory": "15000m",
>>
>>  "minPartitions": "2000",
>>
>>  "taskCpus": "1",
>>
>>  "executorMemoryOverhead": "1300",
>>
>>  "shuffleManager": "tungsten-sort",
>>
>>   "storageFraction": "0.4"
>>
>>
>> This is a snippet of what we see in spark UI for a Job that fails.
>>
>> This is a *stage* of this job that fails.
>>
>> Stage IdPool NameDescriptionSubmittedDurationTasks: Succeeded/TotalInput
>> OutputShuffle Read ▾Shuffle WriteFailure Reason
>> 5 (retry 15) prod
>> 
>>  map
>> at SparkDataJobs.scala:210
>> 
>> +details
>>
>> 2016/02/09 21:30:06 13 min
>> 130/389 (16 failed)
>> 1982.6 MB 818.7 MB org.apache.spark.shuffle.FetchFailedException: Error
>> in opening
>> FileSegmentManagedBuffer{file=/tmp/hadoop/nm-local-dir/usercache/fasd/appcache/application_1454975800192_0447/blockmgr-abb77b52-9761-457a-b67d-42a15b975d76/0c/shuffle_0_39_0.data,
>> offset=11421300, length=2353}
>>
>> This is one of the single *task* attempt from above stage that threw OOM
>>
>> 2 22361 0 FAILED PROCESS_LOCAL 38 / nd1.mycom.local 2016/02/09 22:10:42 5.2
>> min 1.6 min 7.4 MB / 375509 java.lang.OutOfMemoryError: Java heap space
>> +details
>>
>> java.lang.OutOfMemoryError: Java heap space
>>  at java.util.IdentityHashMap.resize(IdentityHashMap.java:469)
>>  at java.util.IdentityHashMap.put(IdentityHashMap.java:445)
>>  at 
>> org.apache.spark.util.SizeEstimator$SearchState.enqueue(SizeEstimator.scala:159)
>>  at 
>> org.apache.spark.util.SizeEstimator$$anonfun$visitSingleObject$1.apply(SizeEstimator.scala:203)
>>  at 
>> org.apache.spark.util.SizeEstimator$$anonfun$visitSingleObject$1.apply(SizeEstimator.scala:202)
>>  at scala.collection.immutable.List.foreach(List.scala:318)
>>  at 
>> org.apache.spark.util.SizeEstimator$.visitSingleObject(SizeEstimator.scala:202)
>>  at 
>> org.apache.spark.util.SizeEstimator$.org$apache$spark$util$SizeEstimator$$estimate(SizeEstimator.scala:186)
>>  at org.apache.spark.util.SizeEstimator$.estimate(SizeEstimator.scala:54)
>>  at 
>> org.apache.spark.util.collection.SizeTracker$class.takeSample(SizeTracker.scala:78)
>>  at 
>> org.apache.spark.util.collection.SizeTracker$class.afterUpdate(SizeTracker.scala:70)
>>  at 
>> org.apache.spark.util.collection.SizeTrackingVector.$plus$eq(SizeTrackingVector.scala:3
>>
>>
>> None of above suggest that it went out ot 15GB of memory that I initially
>> allocated? So what am i missing here. What's eating my memory.
>>
>> We tried executorJavaOpts to get heap dump but it doesn't seem to work.
>>
>> -XX:-HeapDumpOnOutOfMemoryError -XX:OnOutOfMemoryError='kill -3 %p'
>> -XX:HeapDumpPath=/opt/cores/spark
>>
>> I don't see any 

RE: Access to broadcasted variable

2016-02-20 Thread jeff saremi
Is the broadcasted variable distributed to every executor or every worker? Now 
i'm more confused
I thought it was supposed to save memory by distributing it to every worker and 
the executors would share that copy


Date: Fri, 19 Feb 2016 16:48:59 -0800
Subject: Re: Access to broadcasted variable
From: shixi...@databricks.com
To: jeffsar...@hotmail.com
CC: user@spark.apache.org

The broadcasted object is serialized in driver and sent to the executors. And 
in the executor, it will deserialize the bytes to get the broadcasted object.
On Fri, Feb 19, 2016 at 5:54 AM, jeff saremi  wrote:



could someone please comment on this? thanks

From: jeffsar...@hotmail.com
To: user@spark.apache.org
Subject: Access to broadcasted variable
Date: Thu, 18 Feb 2016 14:44:07 -0500








I'd like to know if the broadcasted object gets serialized when accessed by the 
executor during the execution of a task?
I know that it gets serialized from the driver to the worker. This question is 
inside worker when executor JVM's are accessing it

thanks
Jeff
  

  

Re: Read files dynamically having different schema under one parent directory + scala + Spakr 1.5,2

2016-02-20 Thread Chandeep Singh
Here is how you can list all HDFS directories for a given path.

val hadoopConf = new org.apache.hadoop.conf.Configuration()
val hdfsConn = org.apache.hadoop.fs.FileSystem.get(new 
java.net.URI("hdfs://:8020"), hadoopConf)
val c = hdfsConn.listStatus(new org.apache.hadoop.fs.Path("/user/csingh/"))
c.foreach(x => println(x.getPath))

Output:
hdfs:///user/csingh/.Trash
hdfs:///user/csingh/.sparkStaging
hdfs:///user/csingh/.staging
hdfs:///user/csingh/test1
hdfs:///user/csingh/test2
hdfs:///user/csingh/tmp


> On Feb 20, 2016, at 2:37 PM, Divya Gehlot  wrote:
> 
> Hi,
> @Umesh :You understanding is partially correct as per my requirement.
> My idea which I try to implement is 
> Steps which I am trying to follow 
> (Not sure how feasible it is I am new new bee to spark and scala)
> 1.List all the files under parent directory 
>   hdfs :///Testdirectory/
> As list 
> For example : val listsubdirs =(subdir1,subdir2...subdir.n)
> Iterate through this list 
> for(subdir <-listsubdirs){
> val df ="df"+subdir
> df= read it using spark csv package using custom schema
> 
> }
> Will get dataframes equal to subdirs
> 
> Now I got stuck in first step itself .
> How do I list directories and put it in list ?
> 
> Hope you understood my issue now.
> Thanks,
> Divya 
> On Feb 19, 2016 6:54 PM, "UMESH CHAUDHARY"  > wrote:
> If I understood correctly, you can have many sub-dirs under 
> hdfs:///TestDirectory and and you need to attach a schema to all part files 
> in a sub-dir. 
> 
> 1) I am assuming that you know the sub-dirs names :
> 
> For that, you need to list all sub-dirs inside hdfs:///TestDirectory 
> using Scala, iterate over sub-dirs 
> foreach sub-dir in the list 
> read the partfiles , identify and attach schema respective to that 
> sub-directory. 
> 
> 2) If you don't know the sub-directory names:
> You need to store schema somewhere inside that sub-directory and read it 
> in iteration.
> 
> On Fri, Feb 19, 2016 at 3:44 PM, Divya Gehlot  > wrote:
> Hi,
> I have a use case ,where I have one parent directory
> 
> File stucture looks like 
> hdfs:///TestDirectory/spark1/part files( created by some spark job )
> hdfs:///TestDirectory/spark2/ part files (created by some spark job )
> 
> spark1 and spark 2 has different schema 
> 
> like spark 1  part files schema
> carname model year
> 
> Spark2 part files schema
> carowner city  carcost
> 
> 
> As these spark 1 and spark2 directory gets created dynamically 
> can have spark3 directory with different schema
> 
> M requirement is to read the parent directory and list sub drectory 
> and create dataframe for each subdirectory
> 
> I am not able to get how can I list subdirectory under parent directory and 
> dynamically create dataframes.
> 
> Thanks,
> Divya 
> 
> 
> 
> 
> 



spark.driver.maxResultSize doesn't work in conf-file

2016-02-20 Thread AlexModestov
I have a string spark.driver.maxResultSize=0 in the spark-defaults.conf.
But I get an error:

"org.apache.spark.SparkException: Job aborted due to stage failure: Total
size of serialized results of 18 tasks (1070.5 MB) is bigger than
spark.driver.maxResultSize (1024.0 MB)"

But if I write --conf spark.driver.maxResultSize=0 in pyspark-shell it works
fine.

Could anyone know how to fix it?
Thank you



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/spark-driver-maxResultSize-doesn-t-work-in-conf-file-tp26279.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Read files dynamically having different schema under one parent directory + scala + Spakr 1.5,2

2016-02-20 Thread Divya Gehlot
Hi,
@Umesh :You understanding is partially correct as per my requirement.
My idea which I try to implement is
Steps which I am trying to follow
(Not sure how feasible it is I am new new bee to spark and scala)
1.List all the files under parent directory
  hdfs :///Testdirectory/
As list
For example : val listsubdirs =(subdir1,subdir2...subdir.n)
Iterate through this list
for(subdir <-listsubdirs){
val df ="df"+subdir
df= read it using spark csv package using custom schema

}
Will get dataframes equal to subdirs

Now I got stuck in first step itself .
How do I list directories and put it in list ?

Hope you understood my issue now.
Thanks,
Divya
On Feb 19, 2016 6:54 PM, "UMESH CHAUDHARY"  wrote:

> If I understood correctly, you can have many sub-dirs under 
> *hdfs:///TestDirectory
> *and and you need to attach a schema to all part files in a sub-dir.
>
> 1) I am assuming that you know the sub-dirs names :
>
> For that, you need to list all sub-dirs inside *hdfs:///TestDirectory
> *using Scala, iterate over sub-dirs
> foreach sub-dir in the list
> read the partfiles , identify and attach schema respective to that
> sub-directory.
>
> 2) If you don't know the sub-directory names:
> You need to store schema somewhere inside that sub-directory and read
> it in iteration.
>
> On Fri, Feb 19, 2016 at 3:44 PM, Divya Gehlot 
> wrote:
>
>> Hi,
>> I have a use case ,where I have one parent directory
>>
>> File stucture looks like
>> hdfs:///TestDirectory/spark1/part files( created by some spark job )
>> hdfs:///TestDirectory/spark2/ part files (created by some spark job )
>>
>> spark1 and spark 2 has different schema
>>
>> like spark 1  part files schema
>> carname model year
>>
>> Spark2 part files schema
>> carowner city  carcost
>>
>>
>> As these spark 1 and spark2 directory gets created dynamically
>> can have spark3 directory with different schema
>>
>> M requirement is to read the parent directory and list sub drectory
>> and create dataframe for each subdirectory
>>
>> I am not able to get how can I list subdirectory under parent directory
>> and dynamically create dataframes.
>>
>> Thanks,
>> Divya
>>
>>
>>
>>
>>
>


Re: Checking for null values when mapping

2016-02-20 Thread Chandeep Singh
Ah. Ok.

> On Feb 20, 2016, at 2:31 PM, Mich Talebzadeh  wrote:
> 
> Yes I did that as well but no joy. My shell does it for windows files 
> automatically
>  
> Thanks, 
>  
> Dr Mich Talebzadeh
>  
> LinkedIn  
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>  
> 
>  
> http://talebzadehmich.wordpress.com 
>  
> NOTE: The information in this email is proprietary and confidential. This 
> message is for the designated recipient only, if you are not the intended 
> recipient, you should destroy it immediately. Any information in this message 
> shall not be understood as given or endorsed by Peridale Technology Ltd, its 
> subsidiaries or their employees, unless expressly so stated. It is the 
> responsibility of the recipient to ensure that this email is virus free, 
> therefore neither Peridale Technology Ltd, its subsidiaries nor their 
> employees accept any responsibility.
>  
>  
> From: Chandeep Singh [mailto:c...@chandeep.com] 
> Sent: 20 February 2016 14:27
> To: Mich Talebzadeh 
> Cc: user @spark 
> Subject: Re: Checking for null values when mapping
>  
> Also, have you looked into Dos2Unix (http://dos2unix.sourceforge.net/ 
> )
>  
> Has helped me in the past to deal with special characters while using windows 
> based CSV’s in Linux. (Might not be the solution here.. Just an FYI :))
>  
>> On Feb 20, 2016, at 2:17 PM, Chandeep Singh > > wrote:
>>  
>> Understood. In that case Ted’s suggestion to check the length should solve 
>> the problem.
>>  
>>> On Feb 20, 2016, at 2:09 PM, Mich Talebzadeh >> > wrote:
>>>  
>>> Hi,
>>>  
>>> That is a good question.
>>>  
>>> When data is exported from CSV to Linux, any character that cannot be 
>>> transformed is replaced by ?. That question mark is not actually the 
>>> expected “?” J
>>>  
>>> So the only way I can get rid of it is by drooping the first character 
>>> using substring(1). I checked I did the same in Hive sql
>>>  
>>> The actual field in CSV is “£2,500.oo” that translates into “?2,500.00”
>>>  
>>> HTH
>>>  
>>>  
>>> Dr Mich Talebzadeh
>>>  
>>> LinkedIn  
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>  
>>> 
>>>  
>>> http://talebzadehmich.wordpress.com 
>>>  
>>> NOTE: The information in this email is proprietary and confidential. This 
>>> message is for the designated recipient only, if you are not the intended 
>>> recipient, you should destroy it immediately. Any information in this 
>>> message shall not be understood as given or endorsed by Peridale Technology 
>>> Ltd, its subsidiaries or their employees, unless expressly so stated. It is 
>>> the responsibility of the recipient to ensure that this email is virus 
>>> free, therefore neither Peridale Technology Ltd, its subsidiaries nor their 
>>> employees accept any responsibility.
>>>  
>>>  
>>> From: Chandeep Singh [mailto:c...@chandeep.com ] 
>>> Sent: 20 February 2016 13:47
>>> To: Mich Talebzadeh >
>>> Cc: user @spark >
>>> Subject: Re: Checking for null values when mapping
>>>  
>>> Looks like you’re using substring just to get rid of the ‘?’. Why not use 
>>> replace for that as well? And then you wouldn’t run into issues with index 
>>> out of bound.
>>>  
>>> val a = "?1,187.50"  
>>> val b = ""
>>>  
>>> println(a.substring(1).replace(",", "”))
>>> —> 1187.50
>>>  
>>> println(a.replace("?", "").replace(",", "”))
>>> —> 1187.50
>>>  
>>> println(b.replace("?", "").replace(",", "”))
>>> —> No error / output since both ‘?' and ‘,' don’t exist.
>>>  
>>>  
 On Feb 20, 2016, at 8:24 AM, Mich Talebzadeh > wrote:
  
  
 I have a DF like below reading a csv file
  
  
 val df = 
 HiveContext.read.format("com.databricks.spark.csv").option("inferSchema", 
 "true").option("header", "true").load("/data/stg/table2")
  
 val a = df.map(x => (x.getString(0), x.getString(1), 
 x.getString(2).substring(1).replace(",", 
 "").toDouble,x.getString(3).substring(1).replace(",", "").toDouble, 
 x.getString(4).substring(1).replace(",", "").toDouble))
  
  
 For most rows I am reading from csv file the above mapping works fine. 
 However, at the bottom of csv there are couple of empty columns as below
  
 [421,02/10/2015,?1,187.50,?237.50,?1,425.00]
 []
 [Net income,,?182,531.25,?14,606.25,?197,137.50]
 

Re: Checking for null values when mapping

2016-02-20 Thread Chandeep Singh
Also, have you looked into Dos2Unix (http://dos2unix.sourceforge.net/ 
)

Has helped me in the past to deal with special characters while using windows 
based CSV’s in Linux. (Might not be the solution here.. Just an FYI :))

> On Feb 20, 2016, at 2:17 PM, Chandeep Singh  wrote:
> 
> Understood. In that case Ted’s suggestion to check the length should solve 
> the problem.
> 
>> On Feb 20, 2016, at 2:09 PM, Mich Talebzadeh > > wrote:
>> 
>> Hi,
>>  
>> That is a good question.
>>  
>> When data is exported from CSV to Linux, any character that cannot be 
>> transformed is replaced by ?. That question mark is not actually the 
>> expected “?” J
>>  
>> So the only way I can get rid of it is by drooping the first character using 
>> substring(1). I checked I did the same in Hive sql
>>  
>> The actual field in CSV is “£2,500.oo” that translates into “?2,500.00”
>>  
>> HTH
>>  
>>  
>> Dr Mich Talebzadeh
>>  
>> LinkedIn  
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>  
>> 
>>  
>> http://talebzadehmich.wordpress.com 
>>  
>> NOTE: The information in this email is proprietary and confidential. This 
>> message is for the designated recipient only, if you are not the intended 
>> recipient, you should destroy it immediately. Any information in this 
>> message shall not be understood as given or endorsed by Peridale Technology 
>> Ltd, its subsidiaries or their employees, unless expressly so stated. It is 
>> the responsibility of the recipient to ensure that this email is virus free, 
>> therefore neither Peridale Technology Ltd, its subsidiaries nor their 
>> employees accept any responsibility.
>>  
>>  
>> From: Chandeep Singh [mailto:c...@chandeep.com ] 
>> Sent: 20 February 2016 13:47
>> To: Mich Talebzadeh >
>> Cc: user @spark >
>> Subject: Re: Checking for null values when mapping
>>  
>> Looks like you’re using substring just to get rid of the ‘?’. Why not use 
>> replace for that as well? And then you wouldn’t run into issues with index 
>> out of bound.
>>  
>> val a = "?1,187.50"  
>> val b = ""
>>  
>> println(a.substring(1).replace(",", "”))
>> —> 1187.50
>>  
>> println(a.replace("?", "").replace(",", "”))
>> —> 1187.50
>>  
>> println(b.replace("?", "").replace(",", "”))
>> —> No error / output since both ‘?' and ‘,' don’t exist.
>>  
>>  
>>> On Feb 20, 2016, at 8:24 AM, Mich Talebzadeh >> > wrote:
>>>  
>>>  
>>> I have a DF like below reading a csv file
>>>  
>>>  
>>> val df = 
>>> HiveContext.read.format("com.databricks.spark.csv").option("inferSchema", 
>>> "true").option("header", "true").load("/data/stg/table2")
>>>  
>>> val a = df.map(x => (x.getString(0), x.getString(1), 
>>> x.getString(2).substring(1).replace(",", 
>>> "").toDouble,x.getString(3).substring(1).replace(",", "").toDouble, 
>>> x.getString(4).substring(1).replace(",", "").toDouble))
>>>  
>>>  
>>> For most rows I am reading from csv file the above mapping works fine. 
>>> However, at the bottom of csv there are couple of empty columns as below
>>>  
>>> [421,02/10/2015,?1,187.50,?237.50,?1,425.00]
>>> []
>>> [Net income,,?182,531.25,?14,606.25,?197,137.50]
>>> []
>>> [year 2014,,?113,500.00,?0.00,?113,500.00]
>>> [Year 2015,,?69,031.25,?14,606.25,?83,637.50]
>>>  
>>> However, I get 
>>>  
>>> a.collect.foreach(println)
>>> 16/02/20 08:31:53 ERROR Executor: Exception in task 0.0 in stage 123.0 (TID 
>>> 161)
>>> java.lang.StringIndexOutOfBoundsException: String index out of range: -1
>>>  
>>> I suspect the cause is substring operation  say x.getString(2).substring(1) 
>>> on empty values that according to web will throw this type of error
>>>  
>>>  
>>> The easiest solution seems to be to check whether x above is not null and 
>>> do the substring operation. Can this be done without using a UDF?
>>>  
>>> Thanks
>>>  
>>> Dr Mich Talebzadeh
>>>  
>>> LinkedIn  
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>  
>>> 
>>>  
>>> http://talebzadehmich.wordpress.com 
>>>  
>>> NOTE: The information in this email is proprietary and confidential. This 
>>> message is for the designated recipient only, if you are not the intended 
>>> recipient, you should destroy it immediately. Any information in this 
>>> message shall not be understood as given or endorsed by Peridale Technology 
>>> Ltd, its subsidiaries or their employees, unless expressly so stated. It is 
>>> the responsibility of the recipient to ensure that this 

RE: Checking for null values when mapping

2016-02-20 Thread Mich Talebzadeh
Yes I did that as well but no joy. My shell does it for windows files 
automatically

 

Thanks, 

 

Dr Mich Talebzadeh

 

LinkedIn  
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com  

 

NOTE: The information in this email is proprietary and confidential. This 
message is for the designated recipient only, if you are not the intended 
recipient, you should destroy it immediately. Any information in this message 
shall not be understood as given or endorsed by Peridale Technology Ltd, its 
subsidiaries or their employees, unless expressly so stated. It is the 
responsibility of the recipient to ensure that this email is virus free, 
therefore neither Peridale Technology Ltd, its subsidiaries nor their employees 
accept any responsibility.

 

 

From: Chandeep Singh [mailto:c...@chandeep.com] 
Sent: 20 February 2016 14:27
To: Mich Talebzadeh 
Cc: user @spark 
Subject: Re: Checking for null values when mapping

 

Also, have you looked into Dos2Unix (http://dos2unix.sourceforge.net/)

 

Has helped me in the past to deal with special characters while using windows 
based CSV’s in Linux. (Might not be the solution here.. Just an FYI :))

 

On Feb 20, 2016, at 2:17 PM, Chandeep Singh  > wrote:

 

Understood. In that case Ted’s suggestion to check the length should solve the 
problem.

 

On Feb 20, 2016, at 2:09 PM, Mich Talebzadeh  > wrote:

 

Hi,

 

That is a good question.

 

When data is exported from CSV to Linux, any character that cannot be 
transformed is replaced by ?. That question mark is not actually the expected 
“?” :)

 

So the only way I can get rid of it is by drooping the first character using 
substring(1). I checked I did the same in Hive sql

 

The actual field in CSV is “£2,500.oo” that translates into “?2,500.00”

 

HTH

 

 

Dr Mich Talebzadeh

 

LinkedIn   

 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

  http://talebzadehmich.wordpress.com

 

NOTE: The information in this email is proprietary and confidential. This 
message is for the designated recipient only, if you are not the intended 
recipient, you should destroy it immediately. Any information in this message 
shall not be understood as given or endorsed by Peridale Technology Ltd, its 
subsidiaries or their employees, unless expressly so stated. It is the 
responsibility of the recipient to ensure that this email is virus free, 
therefore neither Peridale Technology Ltd, its subsidiaries nor their employees 
accept any responsibility.

 

 

From: Chandeep Singh [mailto:c...@chandeep.com] 
Sent: 20 February 2016 13:47
To: Mich Talebzadeh  >
Cc: user @spark  >
Subject: Re: Checking for null values when mapping

 

Looks like you’re using substring just to get rid of the ‘?’. Why not use 
replace for that as well? And then you wouldn’t run into issues with index out 
of bound.

 

val a = "?1,187.50"  

val b = ""

 

println(a.substring(1).replace(",", "”))

—> 1187.50

 

println(a.replace("?", "").replace(",", "”))

—> 1187.50

 

println(b.replace("?", "").replace(",", "”))

—> No error / output since both ‘?' and ‘,' don’t exist.

 

 

On Feb 20, 2016, at 8:24 AM, Mich Talebzadeh <  
m...@peridale.co.uk> wrote:

 

 

I have a DF like below reading a csv file

 

 

val df = 
HiveContext.read.format("com.databricks.spark.csv").option("inferSchema", 
"true").option("header", "true").load("/data/stg/table2")

 

val a = df.map(x => (x.getString(0), x.getString(1), 
x.getString(2).substring(1).replace(",", 
"").toDouble,x.getString(3).substring(1).replace(",", "").toDouble, 
x.getString(4).substring(1).replace(",", "").toDouble))

 

 

For most rows I am reading from csv file the above mapping works fine. However, 
at the bottom of csv there are couple of empty columns as below

 

[421,02/10/2015,?1,187.50,?237.50,?1,425.00]

[]

[Net income,,?182,531.25,?14,606.25,?197,137.50]

[]

[year 2014,,?113,500.00,?0.00,?113,500.00]

[Year 2015,,?69,031.25,?14,606.25,?83,637.50]

 

However, I get 

 

a.collect.foreach(println)

16/02/20 08:31:53 ERROR Executor: Exception in task 0.0 in stage 123.0 (TID 161)

java.lang.StringIndexOutOfBoundsException: String index out of range: -1

 

I suspect the cause is substring operation  say x.getString(2).substring(1) on 
empty values that according to web will throw this type of error

 

 

The easiest solution seems to be to check whether x above is not null and do 
the substring operation. Can this be done without using a UDF?

 


Re: Checking for null values when mapping

2016-02-20 Thread Chandeep Singh
Understood. In that case Ted’s suggestion to check the length should solve the 
problem.

> On Feb 20, 2016, at 2:09 PM, Mich Talebzadeh  wrote:
> 
> Hi,
>  
> That is a good question.
>  
> When data is exported from CSV to Linux, any character that cannot be 
> transformed is replaced by ?. That question mark is not actually the expected 
> “?” J
>  
> So the only way I can get rid of it is by drooping the first character using 
> substring(1). I checked I did the same in Hive sql
>  
> The actual field in CSV is “£2,500.oo” that translates into “?2,500.00”
>  
> HTH
>  
>  
> Dr Mich Talebzadeh
>  
> LinkedIn  
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>  
> 
>  
> http://talebzadehmich.wordpress.com 
>  
> NOTE: The information in this email is proprietary and confidential. This 
> message is for the designated recipient only, if you are not the intended 
> recipient, you should destroy it immediately. Any information in this message 
> shall not be understood as given or endorsed by Peridale Technology Ltd, its 
> subsidiaries or their employees, unless expressly so stated. It is the 
> responsibility of the recipient to ensure that this email is virus free, 
> therefore neither Peridale Technology Ltd, its subsidiaries nor their 
> employees accept any responsibility.
>  
>  
> From: Chandeep Singh [mailto:c...@chandeep.com] 
> Sent: 20 February 2016 13:47
> To: Mich Talebzadeh 
> Cc: user @spark 
> Subject: Re: Checking for null values when mapping
>  
> Looks like you’re using substring just to get rid of the ‘?’. Why not use 
> replace for that as well? And then you wouldn’t run into issues with index 
> out of bound.
>  
> val a = "?1,187.50"  
> val b = ""
>  
> println(a.substring(1).replace(",", "”))
> —> 1187.50
>  
> println(a.replace("?", "").replace(",", "”))
> —> 1187.50
>  
> println(b.replace("?", "").replace(",", "”))
> —> No error / output since both ‘?' and ‘,' don’t exist.
>  
>  
>> On Feb 20, 2016, at 8:24 AM, Mich Talebzadeh > > wrote:
>>  
>>  
>> I have a DF like below reading a csv file
>>  
>>  
>> val df = 
>> HiveContext.read.format("com.databricks.spark.csv").option("inferSchema", 
>> "true").option("header", "true").load("/data/stg/table2")
>>  
>> val a = df.map(x => (x.getString(0), x.getString(1), 
>> x.getString(2).substring(1).replace(",", 
>> "").toDouble,x.getString(3).substring(1).replace(",", "").toDouble, 
>> x.getString(4).substring(1).replace(",", "").toDouble))
>>  
>>  
>> For most rows I am reading from csv file the above mapping works fine. 
>> However, at the bottom of csv there are couple of empty columns as below
>>  
>> [421,02/10/2015,?1,187.50,?237.50,?1,425.00]
>> []
>> [Net income,,?182,531.25,?14,606.25,?197,137.50]
>> []
>> [year 2014,,?113,500.00,?0.00,?113,500.00]
>> [Year 2015,,?69,031.25,?14,606.25,?83,637.50]
>>  
>> However, I get 
>>  
>> a.collect.foreach(println)
>> 16/02/20 08:31:53 ERROR Executor: Exception in task 0.0 in stage 123.0 (TID 
>> 161)
>> java.lang.StringIndexOutOfBoundsException: String index out of range: -1
>>  
>> I suspect the cause is substring operation  say x.getString(2).substring(1) 
>> on empty values that according to web will throw this type of error
>>  
>>  
>> The easiest solution seems to be to check whether x above is not null and do 
>> the substring operation. Can this be done without using a UDF?
>>  
>> Thanks
>>  
>> Dr Mich Talebzadeh
>>  
>> LinkedIn  
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>  
>> 
>>  
>> http://talebzadehmich.wordpress.com 
>>  
>> NOTE: The information in this email is proprietary and confidential. This 
>> message is for the designated recipient only, if you are not the intended 
>> recipient, you should destroy it immediately. Any information in this 
>> message shall not be understood as given or endorsed by Peridale Technology 
>> Ltd, its subsidiaries or their employees, unless expressly so stated. It is 
>> the responsibility of the recipient to ensure that this email is virus free, 
>> therefore neither Peridale Technology Ltd, its subsidiaries nor their 
>> employees accept any responsibility.



RE: Checking for null values when mapping

2016-02-20 Thread Mich Talebzadeh
Hi,

 

That is a good question.

 

When data is exported from CSV to Linux, any character that cannot be 
transformed is replaced by ?. That question mark is not actually the expected 
“?” :) 

 

So the only way I can get rid of it is by drooping the first character using 
substring(1). I checked I did the same in Hive sql

 

The actual field in CSV is “£2,500.oo” that translates into “?2,500.00”

 

HTH

 

 

Dr Mich Talebzadeh

 

LinkedIn  
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com  

 

NOTE: The information in this email is proprietary and confidential. This 
message is for the designated recipient only, if you are not the intended 
recipient, you should destroy it immediately. Any information in this message 
shall not be understood as given or endorsed by Peridale Technology Ltd, its 
subsidiaries or their employees, unless expressly so stated. It is the 
responsibility of the recipient to ensure that this email is virus free, 
therefore neither Peridale Technology Ltd, its subsidiaries nor their employees 
accept any responsibility.

 

 

From: Chandeep Singh [mailto:c...@chandeep.com] 
Sent: 20 February 2016 13:47
To: Mich Talebzadeh 
Cc: user @spark 
Subject: Re: Checking for null values when mapping

 

Looks like you’re using substring just to get rid of the ‘?’. Why not use 
replace for that as well? And then you wouldn’t run into issues with index out 
of bound.

 

val a = "?1,187.50"  

val b = ""

 

println(a.substring(1).replace(",", "”))

—> 1187.50

 

println(a.replace("?", "").replace(",", "”))

—> 1187.50

 

println(b.replace("?", "").replace(",", "”))

—> No error / output since both ‘?' and ‘,' don’t exist.

 

 

On Feb 20, 2016, at 8:24 AM, Mich Talebzadeh  > wrote:

 

 

I have a DF like below reading a csv file

 

 

val df = 
HiveContext.read.format("com.databricks.spark.csv").option("inferSchema", 
"true").option("header", "true").load("/data/stg/table2")

 

val a = df.map(x => (x.getString(0), x.getString(1), 
x.getString(2).substring(1).replace(",", 
"").toDouble,x.getString(3).substring(1).replace(",", "").toDouble, 
x.getString(4).substring(1).replace(",", "").toDouble))

 

 

For most rows I am reading from csv file the above mapping works fine. However, 
at the bottom of csv there are couple of empty columns as below

 

[421,02/10/2015,?1,187.50,?237.50,?1,425.00]

[]

[Net income,,?182,531.25,?14,606.25,?197,137.50]

[]

[year 2014,,?113,500.00,?0.00,?113,500.00]

[Year 2015,,?69,031.25,?14,606.25,?83,637.50]

 

However, I get 

 

a.collect.foreach(println)

16/02/20 08:31:53 ERROR Executor: Exception in task 0.0 in stage 123.0 (TID 161)

java.lang.StringIndexOutOfBoundsException: String index out of range: -1

 

I suspect the cause is substring operation  say x.getString(2).substring(1) on 
empty values that according to web will throw this type of error

 

 

The easiest solution seems to be to check whether x above is not null and do 
the substring operation. Can this be done without using a UDF?

 

Thanks

 

Dr Mich Talebzadeh

 

LinkedIn  
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com  

 

NOTE: The information in this email is proprietary and confidential. This 
message is for the designated recipient only, if you are not the intended 
recipient, you should destroy it immediately. Any information in this message 
shall not be understood as given or endorsed by Peridale Technology Ltd, its 
subsidiaries or their employees, unless expressly so stated. It is the 
responsibility of the recipient to ensure that this email is virus free, 
therefore neither Peridale Technology Ltd, its subsidiaries nor their employees 
accept any responsibility.

 



Re: Checking for null values when mapping

2016-02-20 Thread Chandeep Singh
Looks like you’re using substring just to get rid of the ‘?’. Why not use 
replace for that as well? And then you wouldn’t run into issues with index out 
of bound.

val a = "?1,187.50"  
val b = ""

println(a.substring(1).replace(",", "”))
—> 1187.50

println(a.replace("?", "").replace(",", "”))
—> 1187.50

println(b.replace("?", "").replace(",", "”))
—> No error / output since both ‘?' and ‘,' don’t exist.


> On Feb 20, 2016, at 8:24 AM, Mich Talebzadeh  wrote:
> 
>  
> I have a DF like below reading a csv file
>  
>  
> val df = 
> HiveContext.read.format("com.databricks.spark.csv").option("inferSchema", 
> "true").option("header", "true").load("/data/stg/table2")
>  
> val a = df.map(x => (x.getString(0), x.getString(1), 
> x.getString(2).substring(1).replace(",", 
> "").toDouble,x.getString(3).substring(1).replace(",", "").toDouble, 
> x.getString(4).substring(1).replace(",", "").toDouble))
>  
>  
> For most rows I am reading from csv file the above mapping works fine. 
> However, at the bottom of csv there are couple of empty columns as below
>  
> [421,02/10/2015,?1,187.50,?237.50,?1,425.00]
> []
> [Net income,,?182,531.25,?14,606.25,?197,137.50]
> []
> [year 2014,,?113,500.00,?0.00,?113,500.00]
> [Year 2015,,?69,031.25,?14,606.25,?83,637.50]
>  
> However, I get 
>  
> a.collect.foreach(println)
> 16/02/20 08:31:53 ERROR Executor: Exception in task 0.0 in stage 123.0 (TID 
> 161)
> java.lang.StringIndexOutOfBoundsException: String index out of range: -1
>  
> I suspect the cause is substring operation  say x.getString(2).substring(1) 
> on empty values that according to web will throw this type of error
>  
>  
> The easiest solution seems to be to check whether x above is not null and do 
> the substring operation. Can this be done without using a UDF?
>  
> Thanks
>  
> Dr Mich Talebzadeh
>  
> LinkedIn  
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>  
> 
>  
> http://talebzadehmich.wordpress.com 
>  
> NOTE: The information in this email is proprietary and confidential. This 
> message is for the designated recipient only, if you are not the intended 
> recipient, you should destroy it immediately. Any information in this message 
> shall not be understood as given or endorsed by Peridale Technology Ltd, its 
> subsidiaries or their employees, unless expressly so stated. It is the 
> responsibility of the recipient to ensure that this email is virus free, 
> therefore neither Peridale Technology Ltd, its subsidiaries nor their 
> employees accept any responsibility.



Re: Streaming with broadcast joins

2016-02-20 Thread Srikanth
Sabastian,

*Update:-*  This is not possible. Probably will remain this way for the
foreseeable future.
https://issues.apache.org/jira/browse/SPARK-3863

Srikanth

On Fri, Feb 19, 2016 at 10:20 AM, Sebastian Piu 
wrote:

> I don't have the code with me now, and I ended moving everything to RDD in
> the end and using map operations to do some lookups, i.e. instead of
> broadcasting a Dataframe I ended broadcasting a Map
>
>
> On Fri, Feb 19, 2016 at 11:39 AM Srikanth  wrote:
>
>> It didn't fail. It wasn't broadcasting. I just ran the test again and
>> here are the logs.
>> Every batch is reading the metadata file.
>>
>> 16/02/19 06:27:02 INFO HadoopRDD: Input split:
>> file:/shared/data/test-data.txt:0+27
>> 16/02/19 06:27:02 INFO HadoopRDD: Input split:
>> file:/shared/data/test-data.txt:27+28
>>
>> 16/02/19 06:27:40 INFO HadoopRDD: Input split:
>> file:/shared/data/test-data.txt:27+28
>> 16/02/19 06:27:40 INFO HadoopRDD: Input split:
>> file:/shared/data/test-data.txt:0+27
>>
>> If I remember, foreachRDD is executed in the driver's context. Not sure
>> how we'll be able to achieve broadcast in this approach(unless we use SQL
>> broadcast hint again)
>>
>> When you say "it worked before",  was it with an older version of spark?
>> I'm trying this on 1.6.
>> If you still have the streaming job running can you verify in spark UI
>> that broadcast join is being used. Also, if the files are read and
>> broadcasted each batch??
>>
>> Thanks for the help!
>>
>>
>> On Fri, Feb 19, 2016 at 3:49 AM, Sebastian Piu 
>> wrote:
>>
>>> I don't see anything obviously wrong on your second approach, I've done
>>> it like that before and it worked. When you say that it didn't work what do
>>> you mean? did it fail? it didnt broadcast?
>>>
>>> On Thu, Feb 18, 2016 at 11:43 PM Srikanth  wrote:
>>>
 Code with SQL broadcast hint. This worked and I was able to see that
 broadcastjoin was performed.

 val testDF = sqlContext.read.format("com.databricks.spark.csv")

  .schema(schema).load("file:///shared/data/test-data.txt")

 val lines = ssc.socketTextStream("DevNode", )

 lines.foreachRDD((rdd, timestamp) => {
val recordDF = rdd.map(_.split(",")).map(l => Record(l(0).toInt,
 l(1))).toDF()
val resultDF = recordDF.join(testDF, "Age")

  
 resultDF.write.format("com.databricks.spark.csv").save("file:///shared/data/output/streaming/"+timestamp)
 }

 But for every batch this file was read and broadcast was performed.
 Evaluating the entire DAG I guess.
   16/02/18 12:24:02 INFO HadoopRDD: Input split:
 file:/shared/data/test-data.txt:27+28
 16/02/18 12:24:02 INFO HadoopRDD: Input split:
 file:/shared/data/test-data.txt:0+27

 16/02/18 12:25:00 INFO HadoopRDD: Input split:
 file:/shared/data/test-data.txt:27+28
 16/02/18 12:25:00 INFO HadoopRDD: Input split:
 file:/shared/data/test-data.txt:0+27


 Then I changed code to broadcast the dataframe. This didn't work
 either. Not sure if this is what you meant by broadcasting a dataframe.

 val testDF =
 sc.broadcast(sqlContext.read.format("com.databricks.spark.csv")

  .schema(schema).load("file:///shared/data/test-data.txt")
  )

 val lines = ssc.socketTextStream("DevNode", )

 lines.foreachRDD((rdd, timestamp) => {
 val recordDF = rdd.map(_.split(",")).map(l => Record(l(0).toInt,
 l(1))).toDF()
 val resultDF = recordDF.join(testDF.value, "Age")

  
 resultDF.write.format("com.databricks.spark.csv").save("file:///shared/data/output/streaming/"+timestamp)
 }


 On Thu, Feb 18, 2016 at 12:55 PM, Sebastian Piu <
 sebastian@gmail.com> wrote:

> Can you paste the code where you use sc.broadcast ?
>
> On Thu, Feb 18, 2016 at 5:32 PM Srikanth 
> wrote:
>
>> Sebastian,
>>
>> I was able to broadcast using sql broadcast hint. Question is how to
>> prevent this broadcast for each RDD.
>> Is there a way where it can be broadcast once and used locally for
>> each RDD?
>> Right now every batch the metadata file is read and the DF is
>> broadcasted.
>> I tried sc.broadcast and that did not provide this behavior.
>>
>> Srikanth
>>
>>
>> On Wed, Feb 17, 2016 at 4:53 PM, Sebastian Piu <
>> sebastian@gmail.com> wrote:
>>
>>> You should be able to broadcast that data frame using sc.broadcast
>>> and join against it.
>>>
>>> On Wed, 17 Feb 2016, 21:13 Srikanth  wrote:
>>>
 Hello,

 I have a streaming use case where I plan to keep a dataset
 broadcasted and cached on each executor.
 Every micro batch in streaming will create a DF out of the 

Re: Checking for null values when mapping

2016-02-20 Thread Ted Yu
For #2, you can filter out row whose first column has length 0. 

Cheers

> On Feb 20, 2016, at 6:59 AM, Mich Talebzadeh  wrote:
> 
> Thanks
>  
>  
> So what I did was
>  
> scala> val df = 
> sqlContext.read.format("com.databricks.spark.csv").option("inferSchema", 
> "true").option("header", "true").load("/data/stg/table2")
> df: org.apache.spark.sql.DataFrame = [Invoice Number: string, Payment date: 
> string, Net: string, VAT: string, Total: string]
>  
>  
> scala> df.printSchema
> root
> |-- Invoice Number: string (nullable = true)
> |-- Payment date: string (nullable = true)
> |-- Net: string (nullable = true)
> |-- VAT: string (nullable = true)
> |-- Total: string (nullable = true)
>  
>  
> So all the columns are Strings
>  
> Then I tried to exclude null rows by filtering on all columns not being null 
> and map the rest
>  
> scala> val a = df.filter(col("Invoice Number").isNotNull and col("Payment 
> date").isNotNull and col("Net").isNotNull and col("VAT").isNotNull and 
> col("Total").isNotNull).map(x => 
> (x.getString(1),x.getString(2).substring(1).replace(",", 
> "").toDouble,x.getString(3).substring(1).replace(",", "").toDouble, 
> x.getString(4).substring(1).replace(",", "").toDouble))
>  
> a: org.apache.spark.rdd.RDD[(String, Double, Double, Double)] = 
> MapPartitionsRDD[176] at map at :21
>  
> This still comes up with “String index out of range: “ error
>  
> 16/02/20 11:50:51 ERROR Executor: Exception in task 0.0 in stage 12.0 (TID 18)
> java.lang.StringIndexOutOfBoundsException: String index out of range: -1
>  
> My questions are:
>  
> 1.Doing the map,  map(x => (x.getString(1)  -- Can I replace 
> x.getString(1) with the actual column name say “Invoice Number” and so forth 
> for other columns as well?
> 2.   Sounds like it crashes because of these columns below at the end
> [421,02/10/2015,?1,187.50,?237.50,?1,425.00]  \\ example good one
> [] \\ bad one, empty one
> [Net income,,?182,531.25,?14,606.25,?197,137.50]
> [] \\ bad one, empty one
> [year 2014,,?113,500.00,?0.00,?113,500.00]
> [Year 2015,,?69,031.25,?14,606.25,?83,637.50]
>  
> 3.Also to clarify I want to drop those two empty line -> []  if I 
> can. Unfortunately  drop() call does not work
> a.drop()
> :24: error: value drop is not a member of 
> org.apache.spark.rdd.RDD[(String, Double, Double, Double)]
>   a.drop()
> ^
>  
> Thanka again,
>  
> Dr Mich Talebzadeh
>  
> LinkedIn  
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>  
> http://talebzadehmich.wordpress.com
>  
> NOTE: The information in this email is proprietary and confidential. This 
> message is for the designated recipient only, if you are not the intended 
> recipient, you should destroy it immediately. Any information in this message 
> shall not be understood as given or endorsed by Peridale Technology Ltd, its 
> subsidiaries or their employees, unless expressly so stated. It is the 
> responsibility of the recipient to ensure that this email is virus free, 
> therefore neither Peridale Technology Ltd, its subsidiaries nor their 
> employees accept any responsibility.
>  
>  
> From: Michał Zieliński [mailto:zielinski.mich...@gmail.com] 
> Sent: 20 February 2016 08:59
> To: Mich Talebzadeh 
> Cc: user @spark 
> Subject: Re: Checking for null values when mapping
>  
> You can use filter and isNotNull on Column before the map.
>  
> On 20 February 2016 at 08:24, Mich Talebzadeh  wrote:
>  
> I have a DF like below reading a csv file
>  
>  
> val df = 
> HiveContext.read.format("com.databricks.spark.csv").option("inferSchema", 
> "true").option("header", "true").load("/data/stg/table2")
>  
> val a = df.map(x => (x.getString(0), x.getString(1), 
> x.getString(2).substring(1).replace(",", 
> "").toDouble,x.getString(3).substring(1).replace(",", "").toDouble, 
> x.getString(4).substring(1).replace(",", "").toDouble))
>  
>  
> For most rows I am reading from csv file the above mapping works fine. 
> However, at the bottom of csv there are couple of empty columns as below
>  
> [421,02/10/2015,?1,187.50,?237.50,?1,425.00]
> []
> [Net income,,?182,531.25,?14,606.25,?197,137.50]
> []
> [year 2014,,?113,500.00,?0.00,?113,500.00]
> [Year 2015,,?69,031.25,?14,606.25,?83,637.50]
>  
> However, I get
>  
> a.collect.foreach(println)
> 16/02/20 08:31:53 ERROR Executor: Exception in task 0.0 in stage 123.0 (TID 
> 161)
> java.lang.StringIndexOutOfBoundsException: String index out of range: -1
>  
> I suspect the cause is substring operation  say x.getString(2).substring(1) 
> on empty values that according to web will throw this type of error
>  
>  
> The easiest solution seems to be to check whether x above is not null and do 
> the substring operation. Can this be done without using a UDF?
>  
> Thanks
>  
> Dr Mich Talebzadeh
>  
> LinkedIn  
> 

RE: Checking for null values when mapping

2016-02-20 Thread Mich Talebzadeh
Thanks

 

 

So what I did was

 

scala> val df = 
sqlContext.read.format("com.databricks.spark.csv").option("inferSchema", 
"true").option("header", "true").load("/data/stg/table2")

df: org.apache.spark.sql.DataFrame = [Invoice Number: string, Payment date: 
string, Net: string, VAT: string, Total: string]

 

 

scala> df.printSchema

root

|-- Invoice Number: string (nullable = true)

|-- Payment date: string (nullable = true)

|-- Net: string (nullable = true)

|-- VAT: string (nullable = true)

|-- Total: string (nullable = true)

 

 

So all the columns are Strings

 

Then I tried to exclude null rows by filtering on all columns not being null 
and map the rest

 

scala> val a = df.filter(col("Invoice Number").isNotNull and col("Payment 
date").isNotNull and col("Net").isNotNull and col("VAT").isNotNull and 
col("Total").isNotNull).map(x => 
(x.getString(1),x.getString(2).substring(1).replace(",", 
"").toDouble,x.getString(3).substring(1).replace(",", "").toDouble, 
x.getString(4).substring(1).replace(",", "").toDouble))

 

a: org.apache.spark.rdd.RDD[(String, Double, Double, Double)] = 
MapPartitionsRDD[176] at map at :21

 

This still comes up with “String index out of range: “ error

 

16/02/20 11:50:51 ERROR Executor: Exception in task 0.0 in stage 12.0 (TID 18)

java.lang.StringIndexOutOfBoundsException: String index out of range: -1

 

My questions are:

 

1.Doing the map,  map(x => (x.getString(1)  -- Can I replace x.getString(1) 
with the actual column name say “Invoice Number” and so forth for other columns 
as well?

2.   Sounds like it crashes because of these columns below at the end

[421,02/10/2015,?1,187.50,?237.50,?1,425.00]  \\ example good one

[] \\ bad one, empty one

[Net income,,?182,531.25,?14,606.25,?197,137.50] 

[] \\ bad one, empty one

[year 2014,,?113,500.00,?0.00,?113,500.00]

[Year 2015,,?69,031.25,?14,606.25,?83,637.50]

 

3.Also to clarify I want to drop those two empty line -> []  if I can. 
Unfortunately  drop() call does not work

a.drop()

:24: error: value drop is not a member of 
org.apache.spark.rdd.RDD[(String, Double, Double, Double)]

  a.drop()

^

 

Thanka again,

 

Dr Mich Talebzadeh

 

LinkedIn  
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com  

 

NOTE: The information in this email is proprietary and confidential. This 
message is for the designated recipient only, if you are not the intended 
recipient, you should destroy it immediately. Any information in this message 
shall not be understood as given or endorsed by Peridale Technology Ltd, its 
subsidiaries or their employees, unless expressly so stated. It is the 
responsibility of the recipient to ensure that this email is virus free, 
therefore neither Peridale Technology Ltd, its subsidiaries nor their employees 
accept any responsibility.

 

 

From: Michał Zieliński [mailto:zielinski.mich...@gmail.com] 
Sent: 20 February 2016 08:59
To: Mich Talebzadeh 
Cc: user @spark 
Subject: Re: Checking for null values when mapping

 

You can use filter and isNotNull on  
 
Column before the map.

 

On 20 February 2016 at 08:24, Mich Talebzadeh  > wrote:

 

I have a DF like below reading a csv file

 

 

val df = 
HiveContext.read.format("com.databricks.spark.csv").option("inferSchema", 
"true").option("header", "true").load("/data/stg/table2")

 

val a = df.map(x => (x.getString(0), x.getString(1), 
x.getString(2).substring(1).replace(",", 
"").toDouble,x.getString(3).substring(1).replace(",", "").toDouble, 
x.getString(4).substring(1).replace(",", "").toDouble))

 

 

For most rows I am reading from csv file the above mapping works fine. However, 
at the bottom of csv there are couple of empty columns as below

 

[421,02/10/2015,?1,187.50,?237.50,?1,425.00]

[]

[Net income,,?182,531.25,?14,606.25,?197,137.50]

[]

[year 2014,,?113,500.00,?0.00,?113,500.00]

[Year 2015,,?69,031.25,?14,606.25,?83,637.50]

 

However, I get 

 

a.collect.foreach(println)

16/02/20 08:31:53 ERROR Executor: Exception in task 0.0 in stage 123.0 (TID 161)

java.lang.StringIndexOutOfBoundsException: String index out of range: -1

 

I suspect the cause is substring operation  say x.getString(2).substring(1) on 
empty values that according to web will throw this type of error

 

 

The easiest solution seems to be to check whether x above is not null and do 
the substring operation. Can this be done without using a UDF?

 

Thanks

 

Dr Mich Talebzadeh

 

LinkedIn  
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com  

 

NOTE: The information in this email 

Is this likely to cause any problems?

2016-02-20 Thread Teng Qiu
@Daniel, there are at least 3 things that EMR can not solve, yet:
- HA support
- AWS provides auto scaling feature, but scale up/down EMR needs manual
operations
- security concerns in a public VPC

EMR is basically designed for short term running use cases with some
pre-defined bootstrap actions and steps, so mainly for scheduled querying
processes, not good as a permanent running cluster for adhoc queries and
analytical works.

Therefore in our organization (a e-commerce company in europe, most of you
may never heard :p but we have more than 1000 techies and 10k employees
now...), we made a solution for this:
https://github.com/zalando/spark-appliance

It enables HA with zookeeper, nodes are under a auto scaling group, and
running in private subnets, provides REST api secured with oauth, and even
integrated with jupyter notebook :)


Am Samstag, 20. Februar 2016 schrieb Sabarish Sasidharan :
> EMR does cost more than vanilla EC2. Using spark-ec2 can result in
savings with large clusters, though that is not everybody's cup of tea.
>
> Regards
> Sab
>
> On 19-Feb-2016 7:55 pm, "Daniel Siegmann" 
wrote:
>>
>> With EMR supporting Spark, I don't see much reason to use the spark-ec2
script unless it is important for you to be able to launch clusters using
the bleeding edge version of Spark. EMR does seem to do a pretty decent job
of keeping up to date - the latest version (4.3.0) supports the latest
Spark version (1.6.0).
>>
>> So I'd flip the question around and ask: is there any reason to continue
using the spark-ec2 script rather than EMR?
>>
>> On Thu, Feb 18, 2016 at 11:39 AM, James Hammerton  wrote:
>>>
>>> I have now... So far  I think the issues I've had are not related to
this, but I wanted to be sure in case it should be something that needs to
be patched. I've had some jobs run successfully but this warning appears in
the logs.
>>> Regards,
>>> James
>>>
>>> On 18 February 2016 at 12:23, Ted Yu  wrote:

 Have you seen this ?
 HADOOP-10988

 Cheers
 On Thu, Feb 18, 2016 at 3:39 AM, James Hammerton 
wrote:
>
> HI,
> I am seeing warnings like this in the logs when I run Spark jobs:
>
> OpenJDK 64-Bit Server VM warning: You have loaded library
/root/ephemeral-hdfs/lib/native/libhadoop.so.1.0.0 which might have
disabled stack guard. The VM will try to fix the stack guard now.
> It's highly recommended that you fix the library with 'execstack -c
', or link it with '-z noexecstack'.
>
> I used spark-ec2 to launch the cluster with the default AMI, Spark
1.5.2, hadoop major version 2.4. I altered the jdk to be openjdk 8 as I'd
written some jobs in Java 8. The 6 workers nodes are m4.2xlarge and master
is m4.large.
> Could this contribute to any problems running the jobs?
> Regards,
> James
>>>
>>
>


Re: How to get the code for class in spark

2016-02-20 Thread Michał Zieliński
Probably you mean reflection:
https://stackoverflow.com/questions/2224251/reflection-on-a-scala-case-class

On 19 February 2016 at 15:14, Ashok Kumar 
wrote:

> Hi,
>
> class body thanks
>
>
> On Friday, 19 February 2016, 11:23, Ted Yu  wrote:
>
>
> Can you clarify your question ?
>
> Did you mean the body of your class ?
>
> On Feb 19, 2016, at 4:43 AM, Ashok Kumar  > wrote:
>
> Hi,
>
> If I define a class in Scala like
>
> case class(col1: String, col2:Int,...)
>
> and it is created how would I be able to see its description anytime
>
> Thanks
>
>
>
>


Re: Checking for null values when mapping

2016-02-20 Thread Michał Zieliński
You can use filter and isNotNull on Column

before the map.

On 20 February 2016 at 08:24, Mich Talebzadeh  wrote:

>
>
> I have a DF like below reading a csv file
>
>
>
>
>
> val df =
> HiveContext.read.format("com.databricks.spark.csv").option("inferSchema",
> "true").option("header", "true").load("/data/stg/table2")
>
>
>
> val a = df.map(x => (x.getString(0), x.getString(1),
> *x.getString(2).substring(1)*.replace(",",
> "").toDouble,x.getString(3).substring(1).replace(",", "").toDouble,
> x.getString(4).substring(1).replace(",", "").toDouble))
>
>
>
>
>
> For most rows I am reading from csv file the above mapping works fine.
> However, at the bottom of csv there are couple of empty columns as below
>
>
>
> [421,02/10/2015,?1,187.50,?237.50,?1,425.00]
>
> []
>
> [Net income,,?182,531.25,?14,606.25,?197,137.50]
>
> []
>
> [year 2014,,?113,500.00,?0.00,?113,500.00]
>
> [Year 2015,,?69,031.25,?14,606.25,?83,637.50]
>
>
>
> However, I get
>
>
>
> a.collect.foreach(println)
>
> 16/02/20 08:31:53 ERROR Executor: Exception in task 0.0 in stage 123.0
> (TID 161)
>
> java.lang.StringIndexOutOfBoundsException: String index out of range: -1
>
>
>
> I suspect the cause is substring operation  say
> x.getString(2).substring(1) on empty values that according to web will
> throw this type of error
>
>
>
>
>
> The easiest solution seems to be to check whether x above is not null and
> do the substring operation. Can this be done without using a UDF?
>
>
>
> Thanks
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> NOTE: The information in this email is proprietary and confidential. This
> message is for the designated recipient only, if you are not the intended
> recipient, you should destroy it immediately. Any information in this
> message shall not be understood as given or endorsed by Peridale Technology
> Ltd, its subsidiaries or their employees, unless expressly so stated. It is
> the responsibility of the recipient to ensure that this email is virus
> free, therefore neither Peridale Technology Ltd, its subsidiaries nor their
> employees accept any responsibility.
>
>
>
>
>


Re: spark stages in parallel

2016-02-20 Thread Hemant Bhanawat
Not possible as of today. See
https://issues.apache.org/jira/browse/SPARK-2387

Hemant Bhanawat
https://www.linkedin.com/in/hemant-bhanawat-92a3811
www.snappydata.io

On Thu, Feb 18, 2016 at 1:19 PM, Shushant Arora 
wrote:

> can two stages of single job run in parallel in spark?
>
> e.g one stage is ,map transformation and another is repartition on mapped
> rdd.
>
> rdd.map(function,100).repartition(30);
>
> can it happen that map transformation which is running 100 tasks after few
> of them say (10 )  are finished and spark started another stage repartition
> which started copying data from mapped stage nodes in parallel.
>
> Thanks
>


Checking for null values when mapping

2016-02-20 Thread Mich Talebzadeh
 

I have a DF like below reading a csv file

 

 

val df =
HiveContext.read.format("com.databricks.spark.csv").option("inferSchema",
"true").option("header", "true").load("/data/stg/table2")

 

val a = df.map(x => (x.getString(0), x.getString(1),
x.getString(2).substring(1).replace(",",
"").toDouble,x.getString(3).substring(1).replace(",", "").toDouble,
x.getString(4).substring(1).replace(",", "").toDouble))

 

 

For most rows I am reading from csv file the above mapping works fine.
However, at the bottom of csv there are couple of empty columns as below

 

[421,02/10/2015,?1,187.50,?237.50,?1,425.00]

[]

[Net income,,?182,531.25,?14,606.25,?197,137.50]

[]

[year 2014,,?113,500.00,?0.00,?113,500.00]

[Year 2015,,?69,031.25,?14,606.25,?83,637.50]

 

However, I get 

 

a.collect.foreach(println)

16/02/20 08:31:53 ERROR Executor: Exception in task 0.0 in stage 123.0 (TID
161)

java.lang.StringIndexOutOfBoundsException: String index out of range: -1

 

I suspect the cause is substring operation  say x.getString(2).substring(1)
on empty values that according to web will throw this type of error

 

 

The easiest solution seems to be to check whether x above is not null and do
the substring operation. Can this be done without using a UDF?

 

Thanks

 

Dr Mich Talebzadeh

 

LinkedIn

https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUr
V8Pw

 

  http://talebzadehmich.wordpress.com

 

NOTE: The information in this email is proprietary and confidential. This
message is for the designated recipient only, if you are not the intended
recipient, you should destroy it immediately. Any information in this
message shall not be understood as given or endorsed by Peridale Technology
Ltd, its subsidiaries or their employees, unless expressly so stated. It is
the responsibility of the recipient to ensure that this email is virus free,
therefore neither Peridale Technology Ltd, its subsidiaries nor their
employees accept any responsibility.