Re: Databricks Cloud vs AWS EMR

2016-01-28 Thread Eran Witkon
Can you name the features that make databricks better than zepplin?
Eran
On Fri, 29 Jan 2016 at 01:37 Michal Klos  wrote:

> We use both databricks and emr. We use databricks for our exploratory /
> adhoc use cases because their notebook is pretty badass and better than
> Zeppelin IMHO.
>
> We use EMR for our production machine learning and ETL tasks. The nice
> thing about EMR is you can use applications other than spark. From a "tools
> in the toolbox" perspective this is very important.
>
> M
>
> On Jan 28, 2016, at 6:05 PM, Sourav Mazumder 
> wrote:
>
> You can also try out IBM's spark as a service in IBM Bluemix. You'll get
> there all required features for security, multitenancy, notebook,
> integration with other big data services. You can try that out for free too.
>
> Regards,
> Sourav
>
> On Thu, Jan 28, 2016 at 2:10 PM, Rakesh Soni 
> wrote:
>
>> At its core, EMR just launches Spark applications, whereas Databricks is
>>> a higher-level platform that also includes multi-user support, an
>>> interactive UI, security, and job scheduling.
>>>
>>> Specifically, Databricks runs standard Spark applications inside a
>>> user’s AWS account, similar to EMR, but it adds a variety of features to
>>> create an end-to-end environment for working with Spark. These include:
>>>
>>>
>>>-
>>>
>>>Interactive UI (includes a workspace with notebooks, dashboards, a
>>>job scheduler, point-and-click cluster management)
>>>-
>>>
>>>Cluster sharing (multiple users can connect to the same cluster,
>>>saving cost)
>>>-
>>>
>>>Security features (access controls to the whole workspace)
>>>-
>>>
>>>Collaboration (multi-user access to the same notebook, revision
>>>control, and IDE and GitHub integration)
>>>-
>>>
>>>Data management (support for connecting different data sources to
>>>Spark, caching service to speed up queries)
>>>
>>>
>>> The idea is that a lot of Spark deployments soon need to bring in
>>> multiple users, different types of jobs, etc, and we want to have these
>>> built-in. But if you just want to connect to existing data and run jobs,
>>> that also works.
>>>
>>> The cluster manager in Databricks is based on Standalone mode, not YARN,
>>> but Databricks adds several features, such as allowing multiple users to
>>> run commands on the same cluster and running multiple versions of Spark.
>>> Because Databricks is also the team that initially built Spark, the service
>>> is very up to date and integrated with the newest Spark features -- e.g.
>>> you can run previews of the next release, any data in Spark can be
>>> displayed visually, etc.
>>>
>>> *From: *Alex Nastetsky 
>>> *Subject: **Databricks Cloud vs AWS EMR*
>>> *Date: *January 26, 2016 at 11:55:41 AM PST
>>> *To: *user 
>>>
>>> As a user of AWS EMR (running Spark and MapReduce), I am interested in
>>> potential benefits that I may gain from Databricks Cloud. I was wondering
>>> if anyone has used both and done comparison / contrast between the two
>>> services.
>>>
>>> In general, which resource manager(s) does Databricks Cloud use for
>>> Spark? If it's YARN, can you also run MapReduce jobs in Databricks Cloud?
>>>
>>> Thanks.
>>>
>>> --
>>
>>
>>
>


Re: How to ignore case in dataframe groupby?

2015-12-30 Thread Eran Witkon
Drop the original column and rename the new column
See df.drop & df.withcolimnrenamed
Eran
On Wed, 30 Dec 2015 at 19:08 raja kbv <raja...@yahoo.com> wrote:

> Solutions from Eran & Yanbo are working well. Thank you.
>
> @Eran,
>
> Your solution worked with a small change.
> DF.withColumn("upper-code",upper(df("countrycode"))).
>
> This creates a new column "upper-code". Is there a way to update the
> column or create a new df with update column?
>
> Thanks,
> Raja
>
> On Thursday, 24 December 2015 6:17 PM, Eran Witkon <eranwit...@gmail.com>
> wrote:
>
>
> Use DF.withColumn("upper-code",df("countrycode).toUpper))
> or just run a map function that does the same
>
> On Thu, Dec 24, 2015 at 2:05 PM Bharathi Raja <raja...@yahoo.com.invalid>
> wrote:
>
> Hi,
> Values in a dataframe column named countrycode are in different cases. Eg:
> (US, us).  groupBy & count gives two rows but the requirement is to ignore
> case for this operation.
> 1) Is there a way to ignore case in groupBy? Or
> 2) Is there a way to update the dataframe column countrycode to uppercase?
>
> Thanks in advance.
>
> Regards,
> Raja
>
>
>
>


Re: How can I get the column data based on specific column name and then stored these data in array or list ?

2015-12-25 Thread Eran Witkon
If you drop other columns (or map to a new df with only that column) and
call collect i think you will get what you want.
On Fri, 25 Dec 2015 at 10:26 fightf...@163.com  wrote:

> Emm...I think you can do a df.map and store each column value to your list.
>
> --
> fightf...@163.com
>
>
> *发件人:* zml张明磊 
> *发送时间:* 2015-12-25 15:33
> *收件人:* user@spark.apache.org
> *抄送:* dev-subscr...@spark.apache.org
> *主题:* How can I get the column data based on specific column name and
> then stored these data in array or list ?
>
> Hi,
>
>
>
>I am a new to Scala and Spark and trying to find relative API in 
> DataFrame
> to solve my problem as title described. However, I just only find this API 
> *DataFrame.col(colName
> : String) : Column * which returns an object of Column. Not the content.
> If only DataFrame support such API which like *Column.toArray : Type* is
> enough for me. But now, it doesn’t. How can I do can achieve this function
> ?
>
>
>
> Thanks,
>
> Minglei.
>
>


Extract compressed JSON withing JSON

2015-12-24 Thread Eran Witkon
Hi,

I have a JSON file with the following row format:
{"cty":"United
Kingdom","gzip":"H4sIAKtWystVslJQcs4rLVHSUUouqQTxQvMyS1JTFLwz89JT8nOB4hnFqSBxj/zS4lSF/DQFl9S83MSibKBMZVExSMbQwNBM19DA2FSpFgDvJUGVUw==","nm":"Edmund
lronside","yrs":"1016"}

The gzip field is a compressed JSON by itself

I want to read the file and build the full nested JSON as a row:

{"cty":"United Kingdom","hse":{"nm": "Cnut","cty": "United
Kingdom","hse": "House of Denmark","yrs": "1016-1035"},"nm":"Edmund
lronside","yrs":"1016"}

I already have the function which extract the compressed field to a string.

Questions:

*if I use the following code the build the RDD :*

val jsonData = sqlContext.read.json(sourceFilesPath)
//
//loop through the DataFrame and manipulate the gzip Filed

val jsonUnGzip = jsonData.map(r => Row(r.getString(0),
GZipHelper.unCompress(r.getString(1)).get, r.getString(2),
r.getString(3)))

*I get a row with 4 columns (String,String,String,String)*

 org.apache.spark.sql.Row = [United Kingdom,{"nm": "Cnut","cty":
"United Kingdom","hse": "House of Denmark","yrs": "1016-1035"},Edmund
lronside,1016]

*Now, I can't tell Spark to "re-parse" Col(1) as JSON, right?*

I seen some post about using case classes or explode but I don't
understand how this can help here?

Eran


Re: How to Parse & flatten JSON object in a text file using Spark into Dataframe

2015-12-24 Thread Eran Witkon
raja! I found the answer to your question!
Look at
http://stackoverflow.com/questions/34069282/how-to-query-json-data-column-using-spark-dataframes
this is what you (and I) was looking for.
general idea - you read the list as text where project Details is just a
string field and then you build the JSON string representation of the whole
line and you have a nested JSON schema which SparkSQL can read.

Eran

On Thu, Dec 24, 2015 at 10:26 AM Eran Witkon <eranwit...@gmail.com> wrote:

> I don't have the exact answer for you but I would look for something using
> explode method on DataFrame
>
> On Thu, Dec 24, 2015 at 7:34 AM Bharathi Raja <raja...@yahoo.com> wrote:
>
>> Thanks Gokul, but the file I have had the same format as I have
>> mentioned. First two columns are not in Json format.
>>
>> Thanks,
>> Raja
>> --
>> From: Gokula Krishnan D <email2...@gmail.com>
>> Sent: ‎12/‎24/‎2015 2:44 AM
>> To: Eran Witkon <eranwit...@gmail.com>
>> Cc: raja kbv <raja...@yahoo.com>; user@spark.apache.org
>>
>> Subject: Re: How to Parse & flatten JSON object in a text file using
>> Spark  into Dataframe
>>
>> You can try this .. But slightly modified the  input structure since
>> first two columns were not in Json format.
>>
>> [image: Inline image 1]
>>
>> Thanks & Regards,
>> Gokula Krishnan* (Gokul)*
>>
>> On Wed, Dec 23, 2015 at 9:46 AM, Eran Witkon <eranwit...@gmail.com>
>> wrote:
>>
>>> Did you get a solution for this?
>>>
>>> On Tue, 22 Dec 2015 at 20:24 raja kbv <raja...@yahoo.com.invalid> wrote:
>>>
>>>> Hi,
>>>>
>>>> I am new to spark.
>>>>
>>>> I have a text file with below structure.
>>>>
>>>>
>>>> (employeeID: Int, Name: String, ProjectDetails:
>>>> JsonObject{[{ProjectName, Description, Duriation, Role}]})
>>>> Eg:
>>>> (123456, Employee1, {“ProjectDetails”:[
>>>>  {
>>>> “ProjectName”: “Web Develoement”, “Description” : “Online Sales website”,
>>>> “Duration” : “6 Months” , “Role” : “Developer”}
>>>>  {
>>>> “ProjectName”: “Spark Develoement”, “Description” : “Online Sales
>>>> Analysis”, “Duration” : “6 Months” , “Role” : “Data Engineer”}
>>>>  {
>>>> “ProjectName”: “Scala Training”, “Description” : “Training”, “Duration” :
>>>> “1 Month” }
>>>>   ]
>>>> }
>>>>
>>>>
>>>> Could someone help me to parse & flatten the record as below dataframe
>>>> using scala?
>>>>
>>>> employeeID,Name, ProjectName, Description, Duration, Role
>>>> 123456, Employee1, Web Develoement, Online Sales website, 6 Months ,
>>>> Developer
>>>> 123456, Employee1, Spark Develoement, Online Sales Analysis, 6 Months,
>>>> Data Engineer
>>>> 123456, Employee1, Scala Training, Training, 1 Month, null
>>>>
>>>>
>>>> Thank you in advance.
>>>>
>>>> Regards,
>>>> Raja
>>>>
>>>
>>


Re: Extract compressed JSON withing JSON

2015-12-24 Thread Eran Witkon
Answered using StackOverflow. if you are looking for the solution:
This is the trick:


val jsonNested = sqlContext.read.json(jsonUnGzip.map{case
Row(cty:String, json:String,nm:String,yrs:String) => s"""{"cty":
\"$cty\", "extractedJson": $json , "nm": \"$nm\" , "yrs":
\"$yrs\"}"""})

See this link for source
http://stackoverflow.com/questions/34069282/how-to-query-json-data-column-using-spark-dataframes

Eran


On Thu, Dec 24, 2015 at 11:42 AM Eran Witkon <eranwit...@gmail.com> wrote:

> Hi,
>
> I have a JSON file with the following row format:
> {"cty":"United
> Kingdom","gzip":"H4sIAKtWystVslJQcs4rLVHSUUouqQTxQvMyS1JTFLwz89JT8nOB4hnFqSBxj/zS4lSF/DQFl9S83MSibKBMZVExSMbQwNBM19DA2FSpFgDvJUGVUw==","nm":"Edmund
> lronside","yrs":"1016"}
>
> The gzip field is a compressed JSON by itself
>
> I want to read the file and build the full nested JSON as a row:
>
> {"cty":"United Kingdom","hse":{"nm": "Cnut","cty": "United Kingdom","hse": 
> "House of Denmark","yrs": "1016-1035"},"nm":"Edmund lronside","yrs":"1016"}
>
> I already have the function which extract the compressed field to a string.
>
> Questions:
>
> *if I use the following code the build the RDD :*
>
> val jsonData = sqlContext.read.json(sourceFilesPath)
> //
> //loop through the DataFrame and manipulate the gzip Filed
>
> val jsonUnGzip = jsonData.map(r => Row(r.getString(0), 
> GZipHelper.unCompress(r.getString(1)).get, r.getString(2), r.getString(3)))
>
> *I get a row with 4 columns (String,String,String,String)*
>
>  org.apache.spark.sql.Row = [United Kingdom,{"nm": "Cnut","cty": "United 
> Kingdom","hse": "House of Denmark","yrs": "1016-1035"},Edmund lronside,1016]
>
> *Now, I can't tell Spark to "re-parse" Col(1) as JSON, right?*
>
> I seen some post about using case classes or explode but I don't understand 
> how this can help here?
>
> Eran
>
>


Re: How to ignore case in dataframe groupby?

2015-12-24 Thread Eran Witkon
Use DF.withColumn("upper-code",df("countrycode).toUpper))
or just run a map function that does the same

On Thu, Dec 24, 2015 at 2:05 PM Bharathi Raja 
wrote:

> Hi,
> Values in a dataframe column named countrycode are in different cases. Eg:
> (US, us).  groupBy & count gives two rows but the requirement is to ignore
> case for this operation.
> 1) Is there a way to ignore case in groupBy? Or
> 2) Is there a way to update the dataframe column countrycode to uppercase?
>
> Thanks in advance.
>
> Regards,
> Raja
>


Re: How to Parse & flatten JSON object in a text file using Spark & Scala into Dataframe

2015-12-23 Thread Eran Witkon
Did you get a solution for this?
On Tue, 22 Dec 2015 at 20:24 raja kbv  wrote:

> Hi,
>
> I am new to spark.
>
> I have a text file with below structure.
>
>
> (employeeID: Int, Name: String, ProjectDetails: JsonObject{[{ProjectName,
> Description, Duriation, Role}]})
> Eg:
> (123456, Employee1, {“ProjectDetails”:[
>  { “ProjectName”:
> “Web Develoement”, “Description” : “Online Sales website”, “Duration” : “6
> Months” , “Role” : “Developer”}
>  { “ProjectName”:
> “Spark Develoement”, “Description” : “Online Sales Analysis”, “Duration” :
> “6 Months” , “Role” : “Data Engineer”}
>  { “ProjectName”:
> “Scala Training”, “Description” : “Training”, “Duration” : “1 Month” }
>   ]
> }
>
>
> Could someone help me to parse & flatten the record as below dataframe
> using scala?
>
> employeeID,Name, ProjectName, Description, Duration, Role
> 123456, Employee1, Web Develoement, Online Sales website, 6 Months ,
> Developer
> 123456, Employee1, Spark Develoement, Online Sales Analysis, 6 Months,
> Data Engineer
> 123456, Employee1, Scala Training, Training, 1 Month, null
>
>
> Thank you in advance.
>
> Regards,
> Raja
>


Re: Using inteliJ for spark development

2015-12-23 Thread Eran Witkon
Thanks, so based on that article, should I use sbt or maven? Or either?
Eran
On Wed, 23 Dec 2015 at 13:05 Akhil Das <ak...@sigmoidanalytics.com> wrote:

> You will have to point to your spark-assembly.jar since spark has a lot of
> dependencies. You can read the answers discussed over here to have a better
> understanding
> http://stackoverflow.com/questions/3589562/why-maven-what-are-the-benefits
>
> Thanks
> Best Regards
>
> On Wed, Dec 23, 2015 at 4:27 PM, Eran Witkon <eranwit...@gmail.com> wrote:
>
>> Thanks, all of these examples shows how to link to spark source and build
>> it as part of my project. why should I do that? why not point directly to
>> my spark.jar?
>> Am I missing something?
>> Eran
>>
>> On Wed, Dec 23, 2015 at 9:59 AM Akhil Das <ak...@sigmoidanalytics.com>
>> wrote:
>>
>>> 1. Install sbt plugin on IntelliJ
>>> 2. Create a new project/Import an sbt project like Dean suggested
>>> 3. Happy Debugging.
>>>
>>> You can also refer to this article for more information
>>> https://cwiki.apache.org/confluence/display/SPARK/Useful+Developer+Tools#UsefulDeveloperTools-IntelliJ
>>>
>>> Thanks
>>> Best Regards
>>>
>>> On Mon, Dec 21, 2015 at 10:51 PM, Eran Witkon <eranwit...@gmail.com>
>>> wrote:
>>>
>>>> Any pointers how to use InteliJ for spark development?
>>>> Any way to use scala worksheet run like spark- shell?
>>>>
>>>
>>>
>


Re: Using inteliJ for spark development

2015-12-23 Thread Eran Witkon
Thanks, all of these examples shows how to link to spark source and build
it as part of my project. why should I do that? why not point directly to
my spark.jar?
Am I missing something?
Eran

On Wed, Dec 23, 2015 at 9:59 AM Akhil Das <ak...@sigmoidanalytics.com>
wrote:

> 1. Install sbt plugin on IntelliJ
> 2. Create a new project/Import an sbt project like Dean suggested
> 3. Happy Debugging.
>
> You can also refer to this article for more information
> https://cwiki.apache.org/confluence/display/SPARK/Useful+Developer+Tools#UsefulDeveloperTools-IntelliJ
>
> Thanks
> Best Regards
>
> On Mon, Dec 21, 2015 at 10:51 PM, Eran Witkon <eranwit...@gmail.com>
> wrote:
>
>> Any pointers how to use InteliJ for spark development?
>> Any way to use scala worksheet run like spark- shell?
>>
>
>


Re: fishing for help!

2015-12-21 Thread Eran Witkon
I'll check it out.
On Tue, 22 Dec 2015 at 00:30 Michal Klos <michal.klo...@gmail.com> wrote:

> If you are running on Amazon, then it's always a crapshoot as well.
>
> M
>
> On Dec 21, 2015, at 4:41 PM, Josh Rosen <joshro...@databricks.com> wrote:
>
> @Eran, are Server 1 and Server 2 both part of the same cluster / do they
> have similar positions in the network topology w.r.t the Spark executors?
> If Server 1 had fast network access to the executors but Server 2 was
> across a WAN then I'd expect the job to run slower from Server 2 duet to
> the extra network latency / reduced bandwidth. This is assuming that you're
> running the driver in non-cluster deploy mode (so the driver process runs
> on the machine which submitted the job).
>
> On Mon, Dec 21, 2015 at 1:30 PM, Igor Berman <igor.ber...@gmail.com>
> wrote:
>
>> look for differences: packages versions, cpu/network/memory diff etc etc
>>
>>
>> On 21 December 2015 at 14:53, Eran Witkon <eranwit...@gmail.com> wrote:
>>
>>> Hi,
>>> I know it is a wide question but can you think of reasons why a pyspark
>>> job which runs on from server 1 using user 1 will run faster then the same
>>> job when running on server 2 with user 1
>>> Eran
>>>
>>
>>
>


fishing for help!

2015-12-21 Thread Eran Witkon
Hi,
I know it is a wide question but can you think of reasons why a pyspark job
which runs on from server 1 using user 1 will run faster then the same job
when running on server 2 with user 1
Eran


Using inteliJ for spark development

2015-12-21 Thread Eran Witkon
Any pointers how to use InteliJ for spark development?
Any way to use scala worksheet run like spark- shell?


Re: spark 1.5.2 memory leak? reading JSON

2015-12-20 Thread Eran Witkon
Once I removed the CR LF from the file it worked ok.
eran
On Mon, 21 Dec 2015 at 06:29 Yin Huai <yh...@databricks.com> wrote:

> Hi Eran,
>
> Can you try 1.6? With the change in
> https://github.com/apache/spark/pull/10288, JSON data source will not
> throw a runtime exception if there is any record that it cannot parse.
> Instead, it will put the entire record to the column of "_corrupt_record".
>
> Thanks,
>
> Yin
>
> On Sun, Dec 20, 2015 at 9:37 AM, Eran Witkon <eranwit...@gmail.com> wrote:
>
>> Thanks for this!
>> This was the problem...
>>
>> On Sun, 20 Dec 2015 at 18:49 Chris Fregly <ch...@fregly.com> wrote:
>>
>>> hey Eran, I run into this all the time with Json.
>>>
>>> the problem is likely that your Json is "too pretty" and extending
>>> beyond a single line which trips up the Json reader.
>>>
>>> my solution is usually to de-pretty the Json - either manually or
>>> through an ETL step - by stripping all white space before pointing my
>>> DataFrame/JSON reader at the file.
>>>
>>> this tool is handy for one-off scenerios:  http://jsonviewer.stack.hu
>>>
>>> for streaming use cases, you'll want to have a light de-pretty ETL step
>>> either within the Spark Streaming job after ingestion - or upstream using
>>> something like a Flume interceptor, NiFi Processor (I love NiFi), or Kafka
>>> transformation assuming those exist by now.
>>>
>>> a similar problem exists for XML, btw.  there's lots of wonky
>>> workarounds for this that use MapPartitions and all kinds of craziness.
>>>  the best option, in my opinion, is to just ETL/flatten the data to make
>>> the DataFrame reader happy.
>>>
>>> On Dec 19, 2015, at 4:55 PM, Eran Witkon <eranwit...@gmail.com> wrote:
>>>
>>> Hi,
>>> I tried the following code in spark-shell on spark1.5.2:
>>>
>>> *val df =
>>> sqlContext.read.json("/home/eranw/Workspace/JSON/sample/sample2.json")*
>>> *df.count()*
>>>
>>> 15/12/19 23:49:40 ERROR Executor: Managed memory leak detected; size =
>>> 67108864 bytes, TID = 3
>>> 15/12/19 23:49:40 ERROR Executor: Exception in task 0.0 in stage 4.0
>>> (TID 3)
>>> java.lang.RuntimeException: Failed to parse a value for data type
>>> StructType() (current token: VALUE_STRING).
>>> at scala.sys.package$.error(package.scala:27)
>>> at
>>> org.apache.spark.sql.execution.datasources.json.JacksonParser$.convertField(JacksonParser.scala:172)
>>> at
>>> org.apache.spark.sql.execution.datasources.json.JacksonParser$$anonfun$parseJson$1$$anonfun$apply$1.apply(JacksonParser.scala:251)
>>> at
>>> org.apache.spark.sql.execution.datasources.json.JacksonParser$$anonfun$parseJson$1$$anonfun$apply$1.apply(JacksonParser.scala:246)
>>> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>>> at
>>> org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.processInputs(TungstenAggregationIterator.scala:365)
>>> at
>>> org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.start(TungstenAggregationIterator.scala:622)
>>> at
>>> org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$
>>> 1.org
>>> $apache$spark$sql$execution$aggregate$TungstenAggregate$$anonfun$$executePartition$1(TungstenAggregate.scala:110)
>>>
>>> Am I am doing something wrong?
>>> Eran
>>>
>>>
>


Re: DataFrame operations

2015-12-20 Thread Eran Witkon
Ptoblem resolved, syntext issue )-:
On Mon, 21 Dec 2015 at 06:09 Jeff Zhang <zjf...@gmail.com> wrote:

> If it does not return a column you expect, then what does this return ? Do
> you will have 2 columns with the same column name ?
>
> On Sun, Dec 20, 2015 at 7:40 PM, Eran Witkon <eranwit...@gmail.com> wrote:
>
>> Hi,
>>
>> I am a bit confused with dataframe operations.
>> I have a function which takes a string and returns a string
>> I want to apply this functions on all rows on a single column in my
>> dataframe
>>
>> I was thinking of the following:
>> jsonData.withColumn("computedField",computeString(jsonData("hse")))
>>
>> BUT jsonData("hse") return a column not the row data
>> What am I missing here?
>>
>
>
>
> --
> Best Regards
>
> Jeff Zhang
>


DataFrame operations

2015-12-20 Thread Eran Witkon
Hi,

I am a bit confused with dataframe operations.
I have a function which takes a string and returns a string
I want to apply this functions on all rows on a single column in my
dataframe

I was thinking of the following:
jsonData.withColumn("computedField",computeString(jsonData("hse")))

BUT jsonData("hse") return a column not the row data
What am I missing here?


Re: combining multiple JSON files to one DataFrame

2015-12-20 Thread Eran Witkon
disregard my last question - my mistake.
I accessed it as a col not as a row :
jsonData.first.getAs[String]("cty")

Eran

On Sun, Dec 20, 2015 at 11:42 AM Eran Witkon <eranwit...@gmail.com> wrote:

> Thanks, That's works.
> One other thing -
> I have the following code:
>
> val jsonData = sqlContext.read.json("/home/eranw/Workspace/JSON/sample")
>
> jsonData.show()
> +--++---+-+
> |   cty| hse| nm|  yrs|
> +--++---+-+
> |United Kingdom|House of Denmark|   Cnut|1016-1035|
> |United Kingdom| House of Wessex|Edmund lronside| 1016|
> +--++---+-+
>
> *Bout when I want to access one of the fields using :*
>
> *jsonData("cty")*
> *I get the name of the field not the value*
> res22: org.apache.spark.sql.Column = cty
> same goes for
> println(cty.toString)
>
> *How do I access the content of the column and pass it as an argument to a
> function?*
>
> *Eran*
>
> On Sun, Dec 20, 2015 at 10:03 AM Alexander Pivovarov <apivova...@gmail.com>
> wrote:
>
>> Just point loader to the folder. You do not need *
>> On Dec 19, 2015 11:21 PM, "Eran Witkon" <eranwit...@gmail.com> wrote:
>>
>>> Hi,
>>> Can I combine multiple JSON files to one DataFrame?
>>>
>>> I tried
>>> val df = sqlContext.read.json("/home/eranw/Workspace/JSON/sample/*")
>>> but I get an empty DF
>>> Eran
>>>
>>


Re: combining multiple JSON files to one DataFrame

2015-12-20 Thread Eran Witkon
Thanks, That's works.
One other thing -
I have the following code:

val jsonData = sqlContext.read.json("/home/eranw/Workspace/JSON/sample")

jsonData.show()
+--++---+-+
|   cty| hse| nm|  yrs|
+--++---+-+
|United Kingdom|House of Denmark|   Cnut|1016-1035|
|United Kingdom| House of Wessex|Edmund lronside| 1016|
+--++---+-+

*Bout when I want to access one of the fields using :*

*jsonData("cty")*
*I get the name of the field not the value*
res22: org.apache.spark.sql.Column = cty
same goes for
println(cty.toString)

*How do I access the content of the column and pass it as an argument to a
function?*

*Eran*

On Sun, Dec 20, 2015 at 10:03 AM Alexander Pivovarov <apivova...@gmail.com>
wrote:

> Just point loader to the folder. You do not need *
> On Dec 19, 2015 11:21 PM, "Eran Witkon" <eranwit...@gmail.com> wrote:
>
>> Hi,
>> Can I combine multiple JSON files to one DataFrame?
>>
>> I tried
>> val df = sqlContext.read.json("/home/eranw/Workspace/JSON/sample/*")
>> but I get an empty DF
>> Eran
>>
>


Re: error: not found: value StructType on 1.5.2

2015-12-20 Thread Eran Witkon
Yes, this works...
Thanks

On Sun, Dec 20, 2015 at 3:57 PM Peter Zhang <zhangju...@gmail.com> wrote:

> Hi Eran,
>
> Missing import package.
>
> import org.apache.spark.sql.types._
>
> will work. please try.
>
> Peter Zhang
> --
> Google
> Sent with Airmail
>
> On December 20, 2015 at 21:43:42, Eran Witkon (eranwit...@gmail.com)
> wrote:
>
> Hi,
> I am using spark-shell with version 1.5.2.
> scala> sc.version
> res17: String = 1.5.2
>
> but when trying to use StructType I am getting error:
> val struct =
>   StructType(
> StructField("a", IntegerType, true) ::
> StructField("b", LongType, false) ::
> StructField("c", BooleanType, false) :: Nil)
>
> :31: error: not found: value StructType
>  StructType(
>
> Why?
>
>


error: not found: value StructType on 1.5.2

2015-12-20 Thread Eran Witkon
Hi,
I am using spark-shell with version 1.5.2.
scala> sc.version
res17: String = 1.5.2

but when trying to use StructType I am getting error:
val struct =
  StructType(
StructField("a", IntegerType, true) ::
StructField("b", LongType, false) ::
StructField("c", BooleanType, false) :: Nil)

:31: error: not found: value StructType
 StructType(

Why?


How to convert and RDD to DF?

2015-12-20 Thread Eran Witkon
Hi,

I have an RDD
jsonGzip
res3: org.apache.spark.rdd.RDD[(String, String, String, String)] =
MapPartitionsRDD[8] at map at :65

which I want to convert to a DataFrame with schema
so I created a schema:

al schema =
  StructType(
StructField("cty", StringType, false) ::
  StructField("hse", StringType, false) ::
StructField("nm", StringType, false) ::
  StructField("yrs", StringType, false) ::Nil)

and called

val unzipJSON = sqlContext.createDataFrame(jsonGzip,schema)
:36: error: overloaded method value createDataFrame with alternatives:
  (rdd: org.apache.spark.api.java.JavaRDD[_],beanClass:
Class[_])org.apache.spark.sql.DataFrame 
  (rdd: org.apache.spark.rdd.RDD[_],beanClass:
Class[_])org.apache.spark.sql.DataFrame 
  (rowRDD: org.apache.spark.api.java.JavaRDD[org.apache.spark.sql.Row],schema:
org.apache.spark.sql.types.StructType)org.apache.spark.sql.DataFrame

  (rowRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row],schema:
org.apache.spark.sql.types.StructType)org.apache.spark.sql.DataFrame
 cannot be applied to (org.apache.spark.rdd.RDD[(String, String,
String, String)], org.apache.spark.sql.types.StructType)
   val unzipJSON = sqlContext.createDataFrame(jsonGzip,schema)


But as you see I don't have the right RDD type.

So how cane I get the a dataframe with the right column names?


Re: How to convert and RDD to DF?

2015-12-20 Thread Eran Witkon
I might be missing you point but I don't get it.
My understanding is that I need a RDD containing Rows but how do I get it?

I started with a DataFrame
run a map on it and got the RDD [string,string,string,strng] not I want to
convert it back to a DataFrame and failing

Why?


On Sun, Dec 20, 2015 at 4:49 PM Ted Yu <yuzhih...@gmail.com> wrote:

> See the comment for createDataFrame(rowRDD: RDD[Row], schema: StructType)
> method:
>
>* Creates a [[DataFrame]] from an [[RDD]] containing [[Row]]s using the
> given schema.
>* It is important to make sure that the structure of every [[Row]] of
> the provided RDD matches
>* the provided schema. Otherwise, there will be runtime exception.
>* Example:
>* {{{
>*  import org.apache.spark.sql._
>*  import org.apache.spark.sql.types._
>*  val sqlContext = new org.apache.spark.sql.SQLContext(sc)
>*
>*  val schema =
>*StructType(
>*  StructField("name", StringType, false) ::
>*  StructField("age", IntegerType, true) :: Nil)
>*
>*  val people =
>*sc.textFile("examples/src/main/resources/people.txt").map(
>*  _.split(",")).map(p => Row(p(0), p(1).trim.toInt))
>*  val dataFrame = sqlContext.createDataFrame(people, schema)
>*  dataFrame.printSchema
>*  // root
>*  // |-- name: string (nullable = false)
>*  // |-- age: integer (nullable = true)
>
> Cheers
>
> On Sun, Dec 20, 2015 at 6:31 AM, Eran Witkon <eranwit...@gmail.com> wrote:
>
>> Hi,
>>
>> I have an RDD
>> jsonGzip
>> res3: org.apache.spark.rdd.RDD[(String, String, String, String)] =
>> MapPartitionsRDD[8] at map at :65
>>
>> which I want to convert to a DataFrame with schema
>> so I created a schema:
>>
>> al schema =
>>   StructType(
>> StructField("cty", StringType, false) ::
>>   StructField("hse", StringType, false) ::
>> StructField("nm", StringType, false) ::
>>   StructField("yrs", StringType, false) ::Nil)
>>
>> and called
>>
>> val unzipJSON = sqlContext.createDataFrame(jsonGzip,schema)
>> :36: error: overloaded method value createDataFrame with 
>> alternatives:
>>   (rdd: org.apache.spark.api.java.JavaRDD[_],beanClass: 
>> Class[_])org.apache.spark.sql.DataFrame 
>>   (rdd: org.apache.spark.rdd.RDD[_],beanClass: 
>> Class[_])org.apache.spark.sql.DataFrame 
>>   (rowRDD: 
>> org.apache.spark.api.java.JavaRDD[org.apache.spark.sql.Row],schema: 
>> org.apache.spark.sql.types.StructType)org.apache.spark.sql.DataFrame 
>>   (rowRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row],schema: 
>> org.apache.spark.sql.types.StructType)org.apache.spark.sql.DataFrame
>>  cannot be applied to (org.apache.spark.rdd.RDD[(String, String, String, 
>> String)], org.apache.spark.sql.types.StructType)
>>val unzipJSON = sqlContext.createDataFrame(jsonGzip,schema)
>>
>>
>> But as you see I don't have the right RDD type.
>>
>> So how cane I get the a dataframe with the right column names?
>>
>>
>>
>


Re: spark 1.5.2 memory leak? reading JSON

2015-12-20 Thread Eran Witkon
Thanks for this!
This was the problem...
On Sun, 20 Dec 2015 at 18:49 Chris Fregly <ch...@fregly.com> wrote:

> hey Eran, I run into this all the time with Json.
>
> the problem is likely that your Json is "too pretty" and extending beyond
> a single line which trips up the Json reader.
>
> my solution is usually to de-pretty the Json - either manually or through
> an ETL step - by stripping all white space before pointing my
> DataFrame/JSON reader at the file.
>
> this tool is handy for one-off scenerios:  http://jsonviewer.stack.hu
>
> for streaming use cases, you'll want to have a light de-pretty ETL step
> either within the Spark Streaming job after ingestion - or upstream using
> something like a Flume interceptor, NiFi Processor (I love NiFi), or Kafka
> transformation assuming those exist by now.
>
> a similar problem exists for XML, btw.  there's lots of wonky workarounds
> for this that use MapPartitions and all kinds of craziness.  the best
> option, in my opinion, is to just ETL/flatten the data to make the
> DataFrame reader happy.
>
> On Dec 19, 2015, at 4:55 PM, Eran Witkon <eranwit...@gmail.com> wrote:
>
> Hi,
> I tried the following code in spark-shell on spark1.5.2:
>
> *val df =
> sqlContext.read.json("/home/eranw/Workspace/JSON/sample/sample2.json")*
> *df.count()*
>
> 15/12/19 23:49:40 ERROR Executor: Managed memory leak detected; size =
> 67108864 bytes, TID = 3
> 15/12/19 23:49:40 ERROR Executor: Exception in task 0.0 in stage 4.0 (TID
> 3)
> java.lang.RuntimeException: Failed to parse a value for data type
> StructType() (current token: VALUE_STRING).
> at scala.sys.package$.error(package.scala:27)
> at
> org.apache.spark.sql.execution.datasources.json.JacksonParser$.convertField(JacksonParser.scala:172)
> at
> org.apache.spark.sql.execution.datasources.json.JacksonParser$$anonfun$parseJson$1$$anonfun$apply$1.apply(JacksonParser.scala:251)
> at
> org.apache.spark.sql.execution.datasources.json.JacksonParser$$anonfun$parseJson$1$$anonfun$apply$1.apply(JacksonParser.scala:246)
> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
> at
> org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.processInputs(TungstenAggregationIterator.scala:365)
> at
> org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.start(TungstenAggregationIterator.scala:622)
> at
> org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$
> 1.org
> $apache$spark$sql$execution$aggregate$TungstenAggregate$$anonfun$$executePartition$1(TungstenAggregate.scala:110)
>
> Am I am doing something wrong?
> Eran
>
>


Re: How to convert and RDD to DF?

2015-12-20 Thread Eran Witkon
Got it to work, thanks
On Sun, 20 Dec 2015 at 17:01 Eran Witkon <eranwit...@gmail.com> wrote:

> I might be missing you point but I don't get it.
> My understanding is that I need a RDD containing Rows but how do I get it?
>
> I started with a DataFrame
> run a map on it and got the RDD [string,string,string,strng] not I want to
> convert it back to a DataFrame and failing
>
> Why?
>
>
> On Sun, Dec 20, 2015 at 4:49 PM Ted Yu <yuzhih...@gmail.com> wrote:
>
>> See the comment for createDataFrame(rowRDD: RDD[Row], schema: StructType)
>> method:
>>
>>* Creates a [[DataFrame]] from an [[RDD]] containing [[Row]]s using
>> the given schema.
>>* It is important to make sure that the structure of every [[Row]] of
>> the provided RDD matches
>>* the provided schema. Otherwise, there will be runtime exception.
>>* Example:
>>* {{{
>>*  import org.apache.spark.sql._
>>*  import org.apache.spark.sql.types._
>>*  val sqlContext = new org.apache.spark.sql.SQLContext(sc)
>>*
>>*  val schema =
>>*StructType(
>>*  StructField("name", StringType, false) ::
>>*  StructField("age", IntegerType, true) :: Nil)
>>*
>>*  val people =
>>*sc.textFile("examples/src/main/resources/people.txt").map(
>>*  _.split(",")).map(p => Row(p(0), p(1).trim.toInt))
>>*  val dataFrame = sqlContext.createDataFrame(people, schema)
>>*  dataFrame.printSchema
>>*  // root
>>*  // |-- name: string (nullable = false)
>>*  // |-- age: integer (nullable = true)
>>
>> Cheers
>>
>> On Sun, Dec 20, 2015 at 6:31 AM, Eran Witkon <eranwit...@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> I have an RDD
>>> jsonGzip
>>> res3: org.apache.spark.rdd.RDD[(String, String, String, String)] =
>>> MapPartitionsRDD[8] at map at :65
>>>
>>> which I want to convert to a DataFrame with schema
>>> so I created a schema:
>>>
>>> al schema =
>>>   StructType(
>>> StructField("cty", StringType, false) ::
>>>   StructField("hse", StringType, false) ::
>>> StructField("nm", StringType, false) ::
>>>   StructField("yrs", StringType, false) ::Nil)
>>>
>>> and called
>>>
>>> val unzipJSON = sqlContext.createDataFrame(jsonGzip,schema)
>>> :36: error: overloaded method value createDataFrame with 
>>> alternatives:
>>>   (rdd: org.apache.spark.api.java.JavaRDD[_],beanClass: 
>>> Class[_])org.apache.spark.sql.DataFrame 
>>>   (rdd: org.apache.spark.rdd.RDD[_],beanClass: 
>>> Class[_])org.apache.spark.sql.DataFrame 
>>>   (rowRDD: 
>>> org.apache.spark.api.java.JavaRDD[org.apache.spark.sql.Row],schema: 
>>> org.apache.spark.sql.types.StructType)org.apache.spark.sql.DataFrame 
>>>   (rowRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row],schema: 
>>> org.apache.spark.sql.types.StructType)org.apache.spark.sql.DataFrame
>>>  cannot be applied to (org.apache.spark.rdd.RDD[(String, String, String, 
>>> String)], org.apache.spark.sql.types.StructType)
>>>val unzipJSON = sqlContext.createDataFrame(jsonGzip,schema)
>>>
>>>
>>> But as you see I don't have the right RDD type.
>>>
>>> So how cane I get the a dataframe with the right column names?
>>>
>>>
>>>
>>


spark 1.5.2 memory leak? reading JSON

2015-12-19 Thread Eran Witkon
Hi,
I tried the following code in spark-shell on spark1.5.2:

*val df =
sqlContext.read.json("/home/eranw/Workspace/JSON/sample/sample2.json")*
*df.count()*

15/12/19 23:49:40 ERROR Executor: Managed memory leak detected; size =
67108864 bytes, TID = 3
15/12/19 23:49:40 ERROR Executor: Exception in task 0.0 in stage 4.0 (TID 3)
java.lang.RuntimeException: Failed to parse a value for data type
StructType() (current token: VALUE_STRING).
at scala.sys.package$.error(package.scala:27)
at
org.apache.spark.sql.execution.datasources.json.JacksonParser$.convertField(JacksonParser.scala:172)
at
org.apache.spark.sql.execution.datasources.json.JacksonParser$$anonfun$parseJson$1$$anonfun$apply$1.apply(JacksonParser.scala:251)
at
org.apache.spark.sql.execution.datasources.json.JacksonParser$$anonfun$parseJson$1$$anonfun$apply$1.apply(JacksonParser.scala:246)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at
org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.processInputs(TungstenAggregationIterator.scala:365)
at
org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.start(TungstenAggregationIterator.scala:622)
at
org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$
1.org
$apache$spark$sql$execution$aggregate$TungstenAggregate$$anonfun$$executePartition$1(TungstenAggregate.scala:110)

Am I am doing something wrong?
Eran


Re: Using Spark to process JSON with gzip filed

2015-12-19 Thread Eran Witkon
Thanks, since it is just a snippt do you mean that Inflater is coming from
ZLIB?
Eran

On Fri, Dec 18, 2015 at 11:37 AM Akhil Das <ak...@sigmoidanalytics.com>
wrote:

> Something like this? This one uses the ZLIB compression, you can replace
> the decompression logic with GZip one in your case.
>
> compressedStream.map(x => {
>   val inflater = new Inflater()
>   inflater.setInput(x.getPayload)
>   val decompressedData = new Array[Byte](x.getPayload.size * 2)
>   var count = inflater.inflate(decompressedData)
>   var finalData = decompressedData.take(count)
>   while (count > 0) {
> count = inflater.inflate(decompressedData)
> finalData = finalData ++ decompressedData.take(count)
>   }
>   new String(finalData)
> })
>
>
>
>
> Thanks
> Best Regards
>
> On Wed, Dec 16, 2015 at 10:02 PM, Eran Witkon <eranwit...@gmail.com>
> wrote:
>
>> Hi,
>> I have a few JSON files in which one of the field is a binary filed -
>> this field is the output of running GZIP of a JSON stream and compressing
>> it to the binary field.
>>
>> Now I want to de-compress the field and get the outpur JSON.
>> I was thinking of running map operation and passing a function to the map
>> operation which will decompress each JSON file.
>> the above function will find the right field in the outer JSON and then
>> run GUNZIP on it.
>>
>> 1) is this a valid practice for spark map job?
>> 2) any pointer on how to do that?
>>
>> Eran
>>
>
>


combining multiple JSON files to one DataFrame

2015-12-19 Thread Eran Witkon
Hi,
Can I combine multiple JSON files to one DataFrame?

I tried
val df = sqlContext.read.json("/home/eranw/Workspace/JSON/sample/*")
but I get an empty DF
Eran


Can't run spark on yarn

2015-12-17 Thread Eran Witkon
Hi,
I am trying to install spark 1.5.2 on Apache hadoop 2.6 and Hive and yarn

spark-env.sh
export HADOOP_CONF_DIR=/usr/local/hadoop/etc/hadoop

bash_profile
#HADOOP VARIABLES START
export JAVA_HOME=/usr/lib/jvm/java-8-oracle/
export HADOOP_INSTALL=/usr/local/hadoop
export PATH=$PATH:$HADOOP_INSTALL/bin
export PATH=$PATH:$HADOOP_INSTALL/sbin
export HADOOP_MAPRED_HOME=$HADOOP_INSTALL
export HADOOP_COMMON_HOME=$HADOOP_INSTALL
export HADOOP_HDFS_HOME=$HADOOP_INSTALL
export YARN_HOME=$HADOOP_INSTALL
export HADOOP_COMMON_LIB_NATIVE_DIR=$HADOOP_INSTALL/lib/native
export HADOOP_OPTS="-Djava.library.path=$HADOOP_INSTALL/lib"
export HADOOP_USER_CLASSPATH_FIRST=true
export HADOOP_CONF_DIR=/usr/local/hadoop/etc/hadoop
export YARN_CONF_DIR=/usr/local/hadoop/etc/hadoop
#HADOOP VARIABLES END

export SPARK_HOME=/usr/local/spark
export HIVE_HOME=/usr/local/hive
export PATH=$PATH:$HIVE_HOME/bin


When I run spark-shell
./bin/spark-shell --master yarn-client

Output:
15/12/17 22:22:07 WARN util.NativeCodeLoader: Unable to load native-hadoop
library for your platform... using builtin-java classes where applicable
15/12/17 22:22:07 INFO spark.SecurityManager: Changing view acls to: hduser
15/12/17 22:22:07 INFO spark.SecurityManager: Changing modify acls to:
hduser
15/12/17 22:22:07 INFO spark.SecurityManager: SecurityManager:
authentication disabled; ui acls disabled; users with view permissions:
Set(hduser); users with modify permissions: Set(hduser)
15/12/17 22:22:07 INFO spark.HttpServer: Starting HTTP Server
15/12/17 22:22:07 INFO server.Server: jetty-8.y.z-SNAPSHOT
15/12/17 22:22:08 INFO server.AbstractConnector: Started
SocketConnector@0.0.0.0:38389
15/12/17 22:22:08 INFO util.Utils: Successfully started service 'HTTP class
server' on port 38389.
Welcome to
    __
 / __/__  ___ _/ /__
_\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 1.5.2
  /_/

Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java
1.8.0_66)
Type in expressions to have them evaluated.
Type :help for more information.
15/12/17 22:22:11 WARN util.Utils: Your hostname, eranw-Lenovo-Yoga-2-Pro
resolves to a loopback address: 127.0.1.1; using 10.0.0.1 instead (on
interface wlp1s0)
15/12/17 22:22:11 WARN util.Utils: Set SPARK_LOCAL_IP if you need to bind
to another address
15/12/17 22:22:11 INFO spark.SparkContext: Running Spark version 1.5.2
15/12/17 22:22:11 INFO spark.SecurityManager: Changing view acls to: hduser
15/12/17 22:22:11 INFO spark.SecurityManager: Changing modify acls to:
hduser
15/12/17 22:22:11 INFO spark.SecurityManager: SecurityManager:
authentication disabled; ui acls disabled; users with view permissions:
Set(hduser); users with modify permissions: Set(hduser)
15/12/17 22:22:11 INFO slf4j.Slf4jLogger: Slf4jLogger started
15/12/17 22:22:11 INFO Remoting: Starting remoting
15/12/17 22:22:12 INFO util.Utils: Successfully started service
'sparkDriver' on port 36381.
15/12/17 22:22:12 INFO Remoting: Remoting started; listening on addresses
:[akka.tcp://sparkDriver@10.0.0.1:36381]
15/12/17 22:22:12 INFO spark.SparkEnv: Registering MapOutputTracker
15/12/17 22:22:12 INFO spark.SparkEnv: Registering BlockManagerMaster
15/12/17 22:22:12 INFO storage.DiskBlockManager: Created local directory at
/tmp/blockmgr-139fac31-5f21-4c61-9575-3110d5205f7d
15/12/17 22:22:12 INFO storage.MemoryStore: MemoryStore started with
capacity 530.0 MB
15/12/17 22:22:12 INFO spark.HttpFileServer: HTTP File server directory is
/tmp/spark-955ef002-a802-49c6-b440-0656861f737c/httpd-2127cbe1-97d7-40a5-a96f-75216f115f00
15/12/17 22:22:12 INFO spark.HttpServer: Starting HTTP Server
15/12/17 22:22:12 INFO server.Server: jetty-8.y.z-SNAPSHOT
15/12/17 22:22:12 INFO server.AbstractConnector: Started
SocketConnector@0.0.0.0:36760
15/12/17 22:22:12 INFO util.Utils: Successfully started service 'HTTP file
server' on port 36760.
15/12/17 22:22:12 INFO spark.SparkEnv: Registering OutputCommitCoordinator
15/12/17 22:22:12 INFO server.Server: jetty-8.y.z-SNAPSHOT
15/12/17 22:22:12 INFO server.AbstractConnector: Started
SelectChannelConnector@0.0.0.0:4040
15/12/17 22:22:12 INFO util.Utils: Successfully started service 'SparkUI'
on port 4040.
15/12/17 22:22:12 INFO ui.SparkUI: Started SparkUI at http://10.0.0.1:4040
15/12/17 22:22:12 WARN metrics.MetricsSystem: Using default name
DAGScheduler for source because spark.app.id is not set.
15/12/17 22:22:12 INFO client.RMProxy: Connecting to ResourceManager at /
0.0.0.0:8032
15/12/17 22:22:12 INFO yarn.Client: Requesting a new application from
cluster with 1 NodeManagers
15/12/17 22:22:12 INFO yarn.Client: Verifying our application has not
requested more than the maximum memory capability of the cluster (8192 MB
per container)
15/12/17 22:22:12 INFO yarn.Client: Will allocate AM container, with 896 MB
memory including 384 MB overhead
15/12/17 22:22:12 INFO yarn.Client: Setting up container launch context for
our AM
15/12/17 22:22:12 INFO yarn.Client: Setting up the launch 

WholeTextFile for 8000~ files - problem

2015-12-16 Thread Eran Witkon
Hi,
I have about 8K files on about 10 directories on hdfs and I need to add a
column to all files with the file name (e.g. file1.txt adds a column with
file1.txt, file 2 with "file2.txt" etc)

The current approach was to read all files using *sc.WholeTextFiles("myPath")
*and have the file name as key and add it as coulmn to each file.

1) I run this on 5 servers each with 24 cores and 24GB RAM with a config of
:
*spark-shell --master yarn-client --executor-core 5 --executor-memory 5G*
But when we run this on all directories at once
(sc.WholeTextFiles("/MySource/*/*") I am getting *java.lang.OutOfMemoryError:
Java heap space*
When running on a single directory all works well
*sc.WholeTextFiles("/MySource/dir1/*") *.

2) One other option is not to use WholeTextFile but read each line with
sc.textFile, but how can I get the file name with textFile?

Eran


Using Spark to process JSON with gzip filed

2015-12-16 Thread Eran Witkon
Hi,
I have a few JSON files in which one of the field is a binary filed - this
field is the output of running GZIP of a JSON stream and compressing it to
the binary field.

Now I want to de-compress the field and get the outpur JSON.
I was thinking of running map operation and passing a function to the map
operation which will decompress each JSON file.
the above function will find the right field in the outer JSON and then run
GUNZIP on it.

1) is this a valid practice for spark map job?
2) any pointer on how to do that?

Eran


Re: Spark big rdd problem

2015-12-16 Thread Eran Witkon
I run the yarn log command and got the following:
A set of yarnAllocator warnings 'expected to find requests, but found none.'
Then an error:
Akka. ErrorMonitor: associationError ...
But then I still get final app status: Succeeded, exit code 0
What does these errors mean?

On Wed, 16 Dec 2015 at 08:27 Eran Witkon <eranwit...@gmail.com> wrote:

> But what if I don't have more memory?
> On Wed, 16 Dec 2015 at 08:13 Zhan Zhang <zzh...@hortonworks.com> wrote:
>
>> There are two cases here. If the container is killed by yarn, you can
>> increase jvm overhead. Otherwise, you have to increase the executor-memory
>> if there is no memory leak happening.
>>
>> Thanks.
>>
>> Zhan Zhang
>>
>> On Dec 15, 2015, at 9:58 PM, Eran Witkon <eranwit...@gmail.com> wrote:
>>
>> If the problem is containers trying to use more memory then they allowed,
>> how do I limit them? I all ready have executor-memory 5G
>> Eran
>> On Tue, 15 Dec 2015 at 23:10 Zhan Zhang <zzh...@hortonworks.com> wrote:
>>
>>> You should be able to get the logs from yarn by “yarn logs
>>> -applicationId xxx”, where you can possible find the cause.
>>>
>>> Thanks.
>>>
>>> Zhan Zhang
>>>
>>> On Dec 15, 2015, at 11:50 AM, Eran Witkon <eranwit...@gmail.com> wrote:
>>>
>>> > When running
>>> > val data = sc.wholeTextFile("someDir/*") data.count()
>>> >
>>> > I get numerous warning from yarn till I get aka association exception.
>>> > Can someone explain what happen when spark loads this rdd and can't
>>> fit it all in memory?
>>> > Based on the exception it looks like the server is disconnecting from
>>> yarn and failing... Any idea why? The code is simple but still failing...
>>> > Eran
>>>
>>>
>>


Spark big rdd problem

2015-12-15 Thread Eran Witkon
When running
val data = sc.wholeTextFile("someDir/*") data.count()

I get numerous warning from yarn till I get aka association exception.
Can someone explain what happen when spark loads this rdd and can't fit it
all in memory?
Based on the exception it looks like the server is disconnecting from yarn
and failing... Any idea why? The code is simple but still failing...
Eran


Re: Spark big rdd problem

2015-12-15 Thread Eran Witkon
If the problem is containers trying to use more memory then they allowed,
how do I limit them? I all ready have executor-memory 5G
Eran
On Tue, 15 Dec 2015 at 23:10 Zhan Zhang <zzh...@hortonworks.com> wrote:

> You should be able to get the logs from yarn by “yarn logs -applicationId
> xxx”, where you can possible find the cause.
>
> Thanks.
>
> Zhan Zhang
>
> On Dec 15, 2015, at 11:50 AM, Eran Witkon <eranwit...@gmail.com> wrote:
>
> > When running
> > val data = sc.wholeTextFile("someDir/*") data.count()
> >
> > I get numerous warning from yarn till I get aka association exception.
> > Can someone explain what happen when spark loads this rdd and can't fit
> it all in memory?
> > Based on the exception it looks like the server is disconnecting from
> yarn and failing... Any idea why? The code is simple but still failing...
> > Eran
>
>


Re: Spark big rdd problem

2015-12-15 Thread Eran Witkon
But what if I don't have more memory?
On Wed, 16 Dec 2015 at 08:13 Zhan Zhang <zzh...@hortonworks.com> wrote:

> There are two cases here. If the container is killed by yarn, you can
> increase jvm overhead. Otherwise, you have to increase the executor-memory
> if there is no memory leak happening.
>
> Thanks.
>
> Zhan Zhang
>
> On Dec 15, 2015, at 9:58 PM, Eran Witkon <eranwit...@gmail.com> wrote:
>
> If the problem is containers trying to use more memory then they allowed,
> how do I limit them? I all ready have executor-memory 5G
> Eran
> On Tue, 15 Dec 2015 at 23:10 Zhan Zhang <zzh...@hortonworks.com> wrote:
>
>> You should be able to get the logs from yarn by “yarn logs -applicationId
>> xxx”, where you can possible find the cause.
>>
>> Thanks.
>>
>> Zhan Zhang
>>
>> On Dec 15, 2015, at 11:50 AM, Eran Witkon <eranwit...@gmail.com> wrote:
>>
>> > When running
>> > val data = sc.wholeTextFile("someDir/*") data.count()
>> >
>> > I get numerous warning from yarn till I get aka association exception.
>> > Can someone explain what happen when spark loads this rdd and can't fit
>> it all in memory?
>> > Based on the exception it looks like the server is disconnecting from
>> yarn and failing... Any idea why? The code is simple but still failing...
>> > Eran
>>
>>
>