Exposing dataframe via thrift server

2016-03-30 Thread ram kumar
Hi,

I started thrift server
cd $SPARK_HOME
./sbin/start-thriftserver.sh

Then, jdbc client
$ ./bin/beeline
Beeline version 1.5.2 by Apache Hive
beeline>!connect jdbc:hive2://ip:1
show tables;
++--+--+
| tableName  | isTemporary  |
++--+--+
| check  | false|
| test   | false|
++--+--+
5 rows selected (0.126 seconds)
>

It shows table that are persisted on hive metastore using saveAsTable.
Temp table (registerTempTable) can't able to view

Can any1 help me with this,
Thanks


Unable to Run Spark Streaming Job in Hadoop YARN mode

2016-03-30 Thread Soni spark
Hi All,

I am unable to run Spark Streaming job in my Hadoop Cluster, its behaving
unexpectedly. When i submit a job, it fails by throwing some socket
exception in HDFS, if i run the same job second or third time, it runs for
sometime and stops.

I am confused. Is there any configuration in YARN-Site.xml file specific to
spark ???

Please suggest me.


Adding Recurrent Neural Network to Spark pipeline.

2016-03-30 Thread Thamali Wijewardhana
Hi all,

I have created a program to use Recurrent neural networks for sentiment
analysis. This program is created based on Deeplearning4j library.
This programs runs fine within a short time.

Then I added the above program built using deeplearning4j library to Spark
pipeline and created a pipeline program. But this pipeline program takes a
long time to run. It takes about 10 hours to process 1000 data rows.

Can you give me how to solve this problem and enhance the speed of my
pipeline program.

Thanks
Thamali


Re: sqlContext.cacheTable + yarn client mode

2016-03-30 Thread Jeff Zhang
The table data is cached in block managers on executors.  Could you paste
the log on your driver about OOM ?

On Thu, Mar 31, 2016 at 1:24 PM, Soam Acharya  wrote:

> Hi folks,
>
> I understand that invoking sqlContext.cacheTable("tableName") will load
> the table into a compressed in-memory columnar format. When Spark is
> launched via spark shell in YARN client mode, is the table loaded into the
> local Spark driver process in addition to the executors in the Hadoop
> cluster or is it just loaded into the executors? We're exploring an OOM
> issue on the local Spark driver for some SQL code and was wondering if the
> local cache load could be the culprit.
>
> Appreciate any thoughts. BTW, we're running Spark 1.6.0 on this particular
> cluster.
>
> Regards,
>
> Soam
>



-- 
Best Regards

Jeff Zhang


sqlContext.cacheTable + yarn client mode

2016-03-30 Thread Soam Acharya
Hi folks,

I understand that invoking sqlContext.cacheTable("tableName") will load the
table into a compressed in-memory columnar format. When Spark is launched
via spark shell in YARN client mode, is the table loaded into the local
Spark driver process in addition to the executors in the Hadoop cluster or
is it just loaded into the executors? We're exploring an OOM issue on the
local Spark driver for some SQL code and was wondering if the local cache
load could be the culprit.

Appreciate any thoughts. BTW, we're running Spark 1.6.0 on this particular
cluster.

Regards,

Soam


SparkSQL Dataframe : partitionColumn, lowerBound, upperBound, numPartitions in context of reading from MySQL

2016-03-30 Thread Soumya Simanta
I'm trying to understand what the following configurations mean and their
 implication on reading data from a MySQL table. I'm looking for options
that will impact my read throughput when reading data from a large table.


Thanks.





partitionColumn, lowerBound, upperBound, numPartitions These options must
all be specified if any of them is specified. They describe how to
partition the table when reading in parallel from multiple workers.
partitionColumn must be a numeric column from the table in question. Notice
thatlowerBound and upperBound are just used to decide the partition stride,
not for filtering the rows in table. So all rows in the table will be
partitioned and returned.


How to design the input source of spark stream

2016-03-30 Thread kramer2...@126.com
Hi

My environment is described like below:

5 nodes, each nodes generate a big csv file every 5 minutes. I need spark
stream to analyze these 5 files in every five minutes to generate some
report.

I am planning to do it in this way:

1. Put those 5 files into HDSF directory called /data
2. Merge them into one big file in that directory
3. Use spark stream constructor textFileStream('/data') to generate my
inputDStream

The problem of this way is I do not know how to merge the 5 files in HDFS.
It seems very difficult to do it in python.

So question is 

1. Can you tell me how to merge files in hdfs by python?
2. Do you know some other way to input those files into spark?





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-design-the-input-source-of-spark-stream-tp26641.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: aggregateByKey on PairRDD

2016-03-30 Thread write2sivakumar@gmail


Hi,
We can use CombineByKey to achieve this.
val finalRDD = tempRDD.combineByKey((x: (Any, Any)) => (x),(acc: (Any, Any), x) 
=> (acc, x),(acc1: (Any, Any), acc2: (Any, Any)) => (acc1, acc2))
finalRDD.collect.foreach(println)
(amazon,((book1, tech),(book2,tech)))(barns, (book,tech))(eBay, 
(book1,tech))
Thanks,Sivakumar

 Original message 
From: Daniel Haviv  
Date: 30/03/2016  18:58  (GMT+08:00) 
To: Akhil Das  
Cc: Suniti Singh , user@spark.apache.org, dev 
 
Subject: Re: aggregateByKey on PairRDD 

Hi,shouldn't groupByKey be avoided 
(https://databricks.gitbooks.io/databricks-spark-knowledge-base/content/best_practices/prefer_reducebykey_over_groupbykey.html)
 ?

Thank you,.Daniel
On Wed, Mar 30, 2016 at 9:01 AM, Akhil Das  wrote:
Isn't it what tempRDD.groupByKey does? 
ThanksBest Regards

On Wed, Mar 30, 2016 at 7:36 AM, Suniti Singh  wrote:
Hi All,
I have an RDD having the data in  the following form :








tempRDD: RDD[(String, (String, String))](brand , (product, 
key))("amazon",("book1","tech"))("eBay",("book1","tech"))
("barns",("book","tech"))
("amazon",("book2","tech"))
I would like to group the data by Brand and would like to get the result set in 
the following format :resultSetRDD : RDD[(String, List[(String), (String)]i 
tried using the aggregateByKey but kind  of not getting how to achieve this. OR 
is there any other way to achieve this?







val resultSetRDD  = tempRDD.aggregateByKey("")({case (aggr , value) => aggr + 
String.valueOf(value) + ","}, (aggr1, aggr2) => aggr1 + aggr2)resultSetRDD = 
(amazon,("book1","tech"),("book2","tech"))Thanks,Suniti






Re: PySpark saving to MongoDB: expected zero arguments for construction of ClassDict (for pyspark.sql.types._create_row)

2016-03-30 Thread Russell Jurney
Actually, I can imagine a one or two line fix for this bug: call
row.asDict() inside a wrapper for DataFrame.rdd. Probably deluding myself
this could be so easily resolved? :)

On Wed, Mar 30, 2016 at 6:10 PM, Russell Jurney 
wrote:

> Thanks to some excellent work by Luke Lovett, we have confirmed this is a
> bug. DataFrame.rdds are not the same as normal RDDs, they are serialized
> differently. It may just be unsupported functionality in PySpark. If that
> is the case, I think this should be added/fixed soon.
>
> The bug is here: https://issues.apache.org/jira/browse/SPARK-14229
> More info about the workaround from Luke is here:
> https://jira.mongodb.org/browse/HADOOP-276
>
> Please follow the SPARK bug if you're here, as more votes will get it more
> attention. I'm surprised that this hasn't been previously reported, as
> saving to a database is a pretty common thing to do from PySpark, and lots
> of analysis must be happening in DataFrames in PySpark?
>
> Anyway, the workaround for this bug is easy, cast the rows as dicts:
>
> my_dataframe = my_dataframe.map(lambda row: row.asDict())
>
>
> On Mon, Mar 28, 2016 at 8:08 PM, Russell Jurney 
> wrote:
>
>> btw, they can't be saved to BSON either. This seems a generic issue, can
>> anyone else reproduce this?
>>
>> On Mon, Mar 28, 2016 at 8:02 PM, Russell Jurney > > wrote:
>>
>>> I created a JIRA: https://issues.apache.org/jira/browse/SPARK-14229
>>>
>>> On Mon, Mar 28, 2016 at 7:43 PM, Russell Jurney <
>>> russell.jur...@gmail.com> wrote:
>>>
 Ted, I am using the .rdd method, see above, but for some reason these
 RDDs can't be saved to MongoDB or ElasticSearch.

 I think this is a bug in PySpark/DataFrame. I can't think of another
 explanation... somehow DataFrame.rdd RDDs are not able to be stored to an
 arbitrary Hadoop OutputFormat. When I do this:

 on_time_lines =
 sc.textFile("../data/On_Time_On_Time_Performance_2015.jsonl.gz")
 on_time_performance = on_time_lines.map(lambda x: json.loads(x))


 on_time_performance.saveToMongoDB('mongodb://localhost:27017/agile_data_science.on_time_performance')


 It works. Same data, but loaded as textFile instead of DataFrame (via
 json/parquet dataframe loading).

 It is the DataFrame.rdd bit that is broken. I will file a JIRA.

 Does anyone know a workaround?

 On Mon, Mar 28, 2016 at 7:28 PM, Ted Yu  wrote:

> See this method:
>
>   lazy val rdd: RDD[T] = {
>
> On Mon, Mar 28, 2016 at 6:30 PM, Russell Jurney <
> russell.jur...@gmail.com> wrote:
>
>> Ok, I'm also unable to save to Elasticsearch using a dataframe's RDD.
>> This seems related to DataFrames. Is there a way to convert a DataFrame's
>> RDD to a 'normal' RDD?
>>
>>
>> On Mon, Mar 28, 2016 at 6:20 PM, Russell Jurney <
>> russell.jur...@gmail.com> wrote:
>>
>>> I filed a JIRA  in the
>>> mongo-hadoop project, but I'm curious if anyone else has seen this 
>>> issue.
>>> Anyone have any idea what to do? I can't save to Mongo from PySpark. A
>>> contrived example works, but a dataframe does not.
>>>
>>> I activate pymongo_spark and load a dataframe:
>>>
>>> import pymongo
>>> import pymongo_spark
>>> # Important: activate pymongo_spark.
>>> pymongo_spark.activate()
>>>
>>> on_time_dataframe =
>>> sqlContext.read.parquet('../data/on_time_performance.parquet')
>>>
>>> Then I try saving to MongoDB in two ways:
>>>
>>>
>>> on_time_dataframe.rdd.saveToMongoDB('mongodb://localhost:27017/agile_data_science.on_time_performance')
>>>
>>> on_time_dataframe.rdd.saveAsNewAPIHadoopFile(
>>>   path='file://unused',
>>>   outputFormatClass='com.mongodb.hadoop.MongoOutputFormat',
>>>   keyClass='org.apache.hadoop.io.Text',
>>>   valueClass='org.apache.hadoop.io.MapWritable',
>>>   conf={"mongo.output.uri":
>>> "mongodb://localhost:27017/agile_data_science.on_time_performance"}
>>> )
>>>
>>>
>>> But I always get this error:
>>>
>>> In [7]:
>>> on_time_rdd.saveToMongoDB('mongodb://localhost:27017/agile_data_science.on_time_performance')
>>>
>>> 16/03/28 18:04:06 INFO mapred.FileInputFormat: Total input paths to
>>> process : 1
>>>
>>> 16/03/28 18:04:06 INFO spark.SparkContext: Starting job: runJob at
>>> PythonRDD.scala:393
>>>
>>> 16/03/28 18:04:06 INFO scheduler.DAGScheduler: Got job 2 (runJob at
>>> PythonRDD.scala:393) with 1 output partitions
>>>
>>> 16/03/28 18:04:06 INFO scheduler.DAGScheduler: Final stage:
>>> ResultStage 2 (runJob at PythonRDD.scala:393)
>>>
>>> 16/03/28 18:04:06 INFO scheduler.DAGScheduler: Parents of final
>>> stage: List()
>>>
>>> 16/03/28 

Re: PySpark saving to MongoDB: expected zero arguments for construction of ClassDict (for pyspark.sql.types._create_row)

2016-03-30 Thread Russell Jurney
Thanks to some excellent work by Luke Lovett, we have confirmed this is a
bug. DataFrame.rdds are not the same as normal RDDs, they are serialized
differently. It may just be unsupported functionality in PySpark. If that
is the case, I think this should be added/fixed soon.

The bug is here: https://issues.apache.org/jira/browse/SPARK-14229
More info about the workaround from Luke is here:
https://jira.mongodb.org/browse/HADOOP-276

Please follow the SPARK bug if you're here, as more votes will get it more
attention. I'm surprised that this hasn't been previously reported, as
saving to a database is a pretty common thing to do from PySpark, and lots
of analysis must be happening in DataFrames in PySpark?

Anyway, the workaround for this bug is easy, cast the rows as dicts:

my_dataframe = my_dataframe.map(lambda row: row.asDict())


On Mon, Mar 28, 2016 at 8:08 PM, Russell Jurney 
wrote:

> btw, they can't be saved to BSON either. This seems a generic issue, can
> anyone else reproduce this?
>
> On Mon, Mar 28, 2016 at 8:02 PM, Russell Jurney 
> wrote:
>
>> I created a JIRA: https://issues.apache.org/jira/browse/SPARK-14229
>>
>> On Mon, Mar 28, 2016 at 7:43 PM, Russell Jurney > > wrote:
>>
>>> Ted, I am using the .rdd method, see above, but for some reason these
>>> RDDs can't be saved to MongoDB or ElasticSearch.
>>>
>>> I think this is a bug in PySpark/DataFrame. I can't think of another
>>> explanation... somehow DataFrame.rdd RDDs are not able to be stored to an
>>> arbitrary Hadoop OutputFormat. When I do this:
>>>
>>> on_time_lines =
>>> sc.textFile("../data/On_Time_On_Time_Performance_2015.jsonl.gz")
>>> on_time_performance = on_time_lines.map(lambda x: json.loads(x))
>>>
>>>
>>> on_time_performance.saveToMongoDB('mongodb://localhost:27017/agile_data_science.on_time_performance')
>>>
>>>
>>> It works. Same data, but loaded as textFile instead of DataFrame (via
>>> json/parquet dataframe loading).
>>>
>>> It is the DataFrame.rdd bit that is broken. I will file a JIRA.
>>>
>>> Does anyone know a workaround?
>>>
>>> On Mon, Mar 28, 2016 at 7:28 PM, Ted Yu  wrote:
>>>
 See this method:

   lazy val rdd: RDD[T] = {

 On Mon, Mar 28, 2016 at 6:30 PM, Russell Jurney <
 russell.jur...@gmail.com> wrote:

> Ok, I'm also unable to save to Elasticsearch using a dataframe's RDD.
> This seems related to DataFrames. Is there a way to convert a DataFrame's
> RDD to a 'normal' RDD?
>
>
> On Mon, Mar 28, 2016 at 6:20 PM, Russell Jurney <
> russell.jur...@gmail.com> wrote:
>
>> I filed a JIRA  in the
>> mongo-hadoop project, but I'm curious if anyone else has seen this issue.
>> Anyone have any idea what to do? I can't save to Mongo from PySpark. A
>> contrived example works, but a dataframe does not.
>>
>> I activate pymongo_spark and load a dataframe:
>>
>> import pymongo
>> import pymongo_spark
>> # Important: activate pymongo_spark.
>> pymongo_spark.activate()
>>
>> on_time_dataframe =
>> sqlContext.read.parquet('../data/on_time_performance.parquet')
>>
>> Then I try saving to MongoDB in two ways:
>>
>>
>> on_time_dataframe.rdd.saveToMongoDB('mongodb://localhost:27017/agile_data_science.on_time_performance')
>>
>> on_time_dataframe.rdd.saveAsNewAPIHadoopFile(
>>   path='file://unused',
>>   outputFormatClass='com.mongodb.hadoop.MongoOutputFormat',
>>   keyClass='org.apache.hadoop.io.Text',
>>   valueClass='org.apache.hadoop.io.MapWritable',
>>   conf={"mongo.output.uri":
>> "mongodb://localhost:27017/agile_data_science.on_time_performance"}
>> )
>>
>>
>> But I always get this error:
>>
>> In [7]:
>> on_time_rdd.saveToMongoDB('mongodb://localhost:27017/agile_data_science.on_time_performance')
>>
>> 16/03/28 18:04:06 INFO mapred.FileInputFormat: Total input paths to
>> process : 1
>>
>> 16/03/28 18:04:06 INFO spark.SparkContext: Starting job: runJob at
>> PythonRDD.scala:393
>>
>> 16/03/28 18:04:06 INFO scheduler.DAGScheduler: Got job 2 (runJob at
>> PythonRDD.scala:393) with 1 output partitions
>>
>> 16/03/28 18:04:06 INFO scheduler.DAGScheduler: Final stage:
>> ResultStage 2 (runJob at PythonRDD.scala:393)
>>
>> 16/03/28 18:04:06 INFO scheduler.DAGScheduler: Parents of final
>> stage: List()
>>
>> 16/03/28 18:04:06 INFO scheduler.DAGScheduler: Missing parents: List()
>>
>> 16/03/28 18:04:06 INFO scheduler.DAGScheduler: Submitting ResultStage
>> 2 (PythonRDD[13] at RDD at PythonRDD.scala:43), which has no missing 
>> parents
>>
>> 16/03/28 18:04:06 INFO storage.MemoryStore: Block broadcast_5 stored
>> as values in memory (estimated size 19.3 KB, free 249.2 KB)
>>
>> 16/03/28 

Re: Does Spark CSV accept a CSV String

2016-03-30 Thread Benjamin Kim
Hi Mich,

I forgot to mention that - this is the ugly part - the source data provider 
gives us (Windows) pkzip compressed files. Will spark uncompress these 
automatically? I haven’t been able to make it work.

Thanks,
Ben

> On Mar 30, 2016, at 2:27 PM, Mich Talebzadeh  
> wrote:
> 
> Hi Ben,
> 
> Well I have done it for standard csv files downloaded from spreadsheets to 
> staging directory on hdfs and loaded from there.
> 
> First you may not need to unzip them. dartabricks can read them (in my case) 
> and zipped files.
> 
> Check this. Mine is slightly different from what you have, First I zip my csv 
> files with bzip2 and load them into hdfs
> 
> #!/bin/ksh
> DIR="/data/stg/accounts/nw/10124772"
> #
> ## Compress the files
> #
> echo `date` " ""===  Started compressing all csv FILEs"
> for FILE in `ls *.csv`
> do
>   /usr/bin/bzip2 ${FILE}
> done
> #
> ## Clear out hdfs staging directory
> #
> echo `date` " ""===  Started deleting old files from hdfs staging 
> directory ${DIR}"
> hdfs dfs -rm -r ${DIR}/*.bz2
> echo `date` " ""===  Started Putting bz2 fileS to hdfs staging directory 
> ${DIR}"
> for FILE in `ls *.bz2`
> do
>   hdfs dfs -copyFromLocal ${FILE} ${DIR}
> done
> echo `date` " ""===  Checking that all files are moved to hdfs staging 
> directory"
> hdfs dfs -ls ${DIR}
> exit 0
> 
> Now you have all your csv files in the staging directory
> 
> import org.apache.spark.sql.functions._
> import java.sql.{Date, Timestamp}
> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
> println ("\nStarted at"); sqlContext.sql("SELECT 
> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss') 
> ").collect.foreach(println)
> 
> val df = 
> sqlContext.read.format("com.databricks.spark.csv").option("inferSchema", 
> "true").option("header", 
> "true").load("hdfs://rhes564:9000/data/stg/accounts/nw/10124772")
> case class Accounts( TransactionDate: String, TransactionType: String, 
> Description: String, Value: Double, Balance: Double, AccountName: String, 
> AccountNumber : String)
> // Map the columns to names
> //
> val a = df.filter(col("Date") > "").map(p => 
> Accounts(p(0).toString,p(1).toString,p(2).toString,p(3).toString.toDouble,p(4).toString.toDouble,p(5).toString,p(6).toString))
> //
> // Create a Spark temporary table
> //
> a.toDF.registerTempTable("tmp")
> 
> // Need to create and populate target ORC table nw_10124772 in database 
> accounts.in  Hive
> //
> sql("use accounts")
> //
> // Drop and create table nw_10124772
> //
> sql("DROP TABLE IF EXISTS accounts.nw_10124772")
> var sqltext : String = ""
> sqltext = """
> CREATE TABLE accounts.nw_10124772 (
> TransactionDateDATE
> ,TransactionType   String
> ,Description   String
> ,Value Double
> ,Balance   Double
> ,AccountName   String
> ,AccountNumber Int
> )
> COMMENT 'from csv file from excel sheet'
> STORED AS ORC
> TBLPROPERTIES ( "orc.compress"="ZLIB" )
> """
> sql(sqltext)
> //
> // Put data in Hive table. Clean up is already done
> //
> sqltext = """
> INSERT INTO TABLE accounts.nw_10124772
> SELECT
>   
> TO_DATE(FROM_UNIXTIME(UNIX_TIMESTAMP(TransactionDate,'dd/MM/'),'-MM-dd'))
>  AS TransactionDate
> , TransactionType
> , Description
> , Value
> , Balance
> , AccountName
> , AccountNumber
> FROM tmp
> """
> sql(sqltext)
> 
> println ("\nFinished at"); sqlContext.sql("SELECT 
> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss') ").collect.fore
> 
> Once you store into a some form of table (Parquet, ORC) etc you can do 
> whatever you like with it.
> 
> HTH
> 
> Dr Mich Talebzadeh
>  
> LinkedIn  
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>  
> 
>  
> http://talebzadehmich.wordpress.com 
>  
> 
> On 30 March 2016 at 22:13, Benjamin Kim  > wrote:
> Hi Mich,
> 
> You are correct. I am talking about the Databricks package spark-csv you have 
> below.
> 
> The files are stored in s3 and I download, unzip, and store each one of them 
> in a variable as a string using the AWS SDK (aws-java-sdk-1.10.60.jar).
> 
> Here is some of the code.
> 
> val filesRdd = sc.parallelize(lFiles, 250)
> filesRdd.foreachPartition(files => {
>   val s3Client = new AmazonS3Client(new 
> EnvironmentVariableCredentialsProvider())
>   files.foreach(file => {
> val s3Object = s3Client.getObject(new GetObjectRequest(s3Bucket, file))
> val zipFile = new ZipInputStream(s3Object.getObjectContent())
> val csvFile = readZipStream(zipFile)
>   })
> })
> 
> This function does the unzipping and converts to string.
> 
> def readZipStream(stream: ZipInputStream): String = {
>   stream.getNextEntry
>   var stuff = new 

Re: Does Spark CSV accept a CSV String

2016-03-30 Thread Mich Talebzadeh
Hi Ben,

Well I have done it for standard csv files downloaded from spreadsheets to
staging directory on hdfs and loaded from there.

First you may not need to unzip them. dartabricks can read them (in my
case) and zipped files.

Check this. Mine is slightly different from what you have, First I zip my
csv files with bzip2 and load them into hdfs

#!/bin/ksh
DIR="/data/stg/accounts/nw/10124772"
#
## Compress the files
#
echo `date` " ""===  Started compressing all csv FILEs"
for FILE in `ls *.csv`
do
  /usr/bin/bzip2 ${FILE}
done
#
## Clear out hdfs staging directory
#
echo `date` " ""===  Started deleting old files from hdfs staging
directory ${DIR}"
hdfs dfs -rm -r ${DIR}/*.bz2
echo `date` " ""===  Started Putting bz2 fileS to hdfs staging
directory ${DIR}"
for FILE in `ls *.bz2`
do
  hdfs dfs -copyFromLocal ${FILE} ${DIR}
done
echo `date` " ""===  Checking that all files are moved to hdfs staging
directory"
hdfs dfs -ls ${DIR}
exit 0

Now you have all your csv files in the staging directory

import org.apache.spark.sql.functions._
import java.sql.{Date, Timestamp}
val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
println ("\nStarted at"); sqlContext.sql("SELECT
FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
").collect.foreach(println)

val df =
sqlContext.read.format("com.databricks.spark.csv").option("inferSchema",
"true").option("header",
"true").load("hdfs://rhes564:9000/data/stg/accounts/nw/10124772")
case class Accounts( TransactionDate: String, TransactionType: String,
Description: String, Value: Double, Balance: Double, AccountName: String,
AccountNumber : String)
// Map the columns to names
//
val a = df.filter(col("Date") > "").map(p =>
Accounts(p(0).toString,p(1).toString,p(2).toString,p(3).toString.toDouble,p(4).toString.toDouble,p(5).toString,p(6).toString))
//
// Create a Spark temporary table
//
a.toDF.registerTempTable("tmp")

// Need to create and populate target ORC table nw_10124772 in database
accounts.in Hive
//
sql("use accounts")
//
// Drop and create table nw_10124772
//
sql("DROP TABLE IF EXISTS accounts.nw_10124772")
var sqltext : String = ""
sqltext = """
CREATE TABLE accounts.nw_10124772 (
TransactionDateDATE
,TransactionType   String
,Description   String
,Value Double
,Balance   Double
,AccountName   String
,AccountNumber Int
)
COMMENT 'from csv file from excel sheet'
STORED AS ORC
TBLPROPERTIES ( "orc.compress"="ZLIB" )
"""
sql(sqltext)
//
// Put data in Hive table. Clean up is already done
//
sqltext = """
INSERT INTO TABLE accounts.nw_10124772
SELECT

TO_DATE(FROM_UNIXTIME(UNIX_TIMESTAMP(TransactionDate,'dd/MM/'),'-MM-dd'))
AS TransactionDate
, TransactionType
, Description
, Value
, Balance
, AccountName
, AccountNumber
FROM tmp
"""
sql(sqltext)

println ("\nFinished at"); sqlContext.sql("SELECT
FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss') ").collect.fore

Once you store into a some form of table (Parquet, ORC) etc you can do
whatever you like with it.

HTH

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 30 March 2016 at 22:13, Benjamin Kim  wrote:

> Hi Mich,
>
> You are correct. I am talking about the Databricks package spark-csv you
> have below.
>
> The files are stored in s3 and I download, unzip, and store each one of
> them in a variable as a string using the AWS SDK (aws-java-sdk-1.10.60.jar).
>
> Here is some of the code.
>
> val filesRdd = sc.parallelize(lFiles, 250)
> filesRdd.foreachPartition(files => {
>   val s3Client = new AmazonS3Client(new
> EnvironmentVariableCredentialsProvider())
>   files.foreach(file => {
> val s3Object = s3Client.getObject(new GetObjectRequest(s3Bucket, file))
> val zipFile = new ZipInputStream(s3Object.getObjectContent())
> val csvFile = readZipStream(zipFile)
>   })
> })
>
> This function does the unzipping and converts to string.
>
> def readZipStream(stream: ZipInputStream): String = {
>   stream.getNextEntry
>   var stuff = new ListBuffer[String]()
>   val scanner = new Scanner(stream)
>   while(scanner.hasNextLine){
> stuff += scanner.nextLine
>   }
>   stuff.toList.mkString("\n")
> }
>
> The next step is to parse the CSV string and convert to a dataframe, which
> will populate a Hive/HBase table.
>
> If you can help, I would be truly grateful.
>
> Thanks,
> Ben
>
>
> On Mar 30, 2016, at 2:06 PM, Mich Talebzadeh 
> wrote:
>
> just to clarify are you talking about databricks csv package.
>
> $SPARK_HOME/bin/spark-shell --packages com.databricks:spark-csv_2.11:1.3.0
>
> Where are these zipped files? Are they copied to a staging directory in
> hdfs?
>
> HTH
>
> Dr Mich 

Re: Does Spark CSV accept a CSV String

2016-03-30 Thread Benjamin Kim
Hi Mich,

You are correct. I am talking about the Databricks package spark-csv you have 
below.

The files are stored in s3 and I download, unzip, and store each one of them in 
a variable as a string using the AWS SDK (aws-java-sdk-1.10.60.jar).

Here is some of the code.

val filesRdd = sc.parallelize(lFiles, 250)
filesRdd.foreachPartition(files => {
  val s3Client = new AmazonS3Client(new 
EnvironmentVariableCredentialsProvider())
  files.foreach(file => {
val s3Object = s3Client.getObject(new GetObjectRequest(s3Bucket, file))
val zipFile = new ZipInputStream(s3Object.getObjectContent())
val csvFile = readZipStream(zipFile)
  })
})

This function does the unzipping and converts to string.

def readZipStream(stream: ZipInputStream): String = {
  stream.getNextEntry
  var stuff = new ListBuffer[String]()
  val scanner = new Scanner(stream)
  while(scanner.hasNextLine){
stuff += scanner.nextLine
  }
  stuff.toList.mkString("\n")
}

The next step is to parse the CSV string and convert to a dataframe, which will 
populate a Hive/HBase table.

If you can help, I would be truly grateful.

Thanks,
Ben


> On Mar 30, 2016, at 2:06 PM, Mich Talebzadeh  
> wrote:
> 
> just to clarify are you talking about databricks csv package.
> 
> $SPARK_HOME/bin/spark-shell --packages com.databricks:spark-csv_2.11:1.3.0
> 
> Where are these zipped files? Are they copied to a staging directory in hdfs?
> 
> HTH
> 
> Dr Mich Talebzadeh
>  
> LinkedIn  
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>  
> 
>  
> http://talebzadehmich.wordpress.com 
>  
> 
> On 30 March 2016 at 15:17, Benjamin Kim  > wrote:
> I have a quick question. I have downloaded multiple zipped files from S3 and 
> unzipped each one of them into strings. The next step is to parse using a CSV 
> parser. I want to know if there is a way to easily use the spark csv package 
> for this?
> 
> Thanks,
> Ben
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
> 
> For additional commands, e-mail: user-h...@spark.apache.org 
> 
> 
> 



Re: Does Spark CSV accept a CSV String

2016-03-30 Thread Mich Talebzadeh
just to clarify are you talking about databricks csv package.

$SPARK_HOME/bin/spark-shell --packages com.databricks:spark-csv_2.11:1.3.0

Where are these zipped files? Are they copied to a staging directory in
hdfs?

HTH

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 30 March 2016 at 15:17, Benjamin Kim  wrote:

> I have a quick question. I have downloaded multiple zipped files from S3
> and unzipped each one of them into strings. The next step is to parse using
> a CSV parser. I want to know if there is a way to easily use the spark csv
> package for this?
>
> Thanks,
> Ben
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: pyspark read json file with high dimensional sparse data

2016-03-30 Thread Michael Armbrust
You can force the data to be loaded as a sparse map assuming the key/value
types are consistent.  Here is an example

.

On Wed, Mar 30, 2016 at 8:17 AM, Yavuz Nuzumlalı 
wrote:

> Hi all,
>
> I'm trying to read a data inside a json file using
> `SQLContext.read.json()` method.
>
> However, reading operation does not finish. My data is of 29x3100
> dimensions, but it's actually really sparse, so if there is a way to
> directly read json into a sparse dataframe, it would work perfect for me.
>
> What are the alternatives for reading such data into spark?
>
> P.S. : When I try to load first 5 rows, read operation is completed in
> ~2 minutes.
>


Spark 1.5.2 Master OOM

2016-03-30 Thread Yong Zhang
Hi, Sparkers
Our cluster is running Spark 1.5.2 with Standalone mode.
It runs fine for weeks, but today, I found out the master crash due to OOM.
We have several ETL jobs runs daily on Spark, and adhoc jobs. I can see the 
"Completed Applications" table grows in the master UI.
Original I set "export SPARK_DAEMON_MEMORY=1g", as I don't think master/worker 
JVM daemon needs too much memory.
I never meet Spar master OOM when we run on version 1.3.1. But suddenly, I got 
it on 1.5.2.
I am not sure if it is due to growing "Completed Applications" history? I 
didn't start the spark history server, as we don't have that requirement yet.
Now I change the daemon memory from 1g to 2g, and restart the cluster.
Below is the OOM log in the spark master. In fact, OOM happened minutes after a 
job just finished. The job finished successfully, as the final HDFS output was 
generated around 09:31:00. So in theory, there is no active jobs while OOM 
happens, or it is trigged by the succeeds of that job. I don't know. But Spark 
master OOM in fact is a SPOF for us. Does anyone have any idea about it?
16/03/30 09:36:40 ERROR akka.ErrorMonitor: Uncaught fatal error from thread 
[sparkMaster-akka.remote.default-remote-dispatcher-33] shutting down 
ActorSystem [sparkMaster]java.lang.OutOfMemoryError: Java heap spaceat 
java.lang.Class.getDeclaredMethods0(Native Method)at 
java.lang.Class.privateGetDeclaredMethods(Class.java:2615)at 
java.lang.Class.getDeclaredMethod(Class.java:2007)at 
java.io.ObjectStreamClass.getPrivateMethod(ObjectStreamClass.java:1431)
at java.io.ObjectStreamClass.access$1700(ObjectStreamClass.java:72)at 
java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:494)at 
java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:468)at 
java.security.AccessController.doPrivileged(Native Method)at 
java.io.ObjectStreamClass.(ObjectStreamClass.java:468)at 
java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:365)at 
java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:602)at 
java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1622)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1622)  
  at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)   
 at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)   
 at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)  
  at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)   
 at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)   
 at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)   
 at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) 
   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) 
   at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) 
   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
at akka.serialization.JavaSerializer$$anonfun$1.apply(Serializer.scala:136) 
   at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)at 
akka.serialization.JavaSerializer.fromBinary(Serializer.scala:136)at 
akka.serialization.Serialization$$anonfun$deserialize$1.apply(Serialization.scala:104)
at scala.util.Try$.apply(Try.scala:161)at 
akka.serialization.Serialization.deserialize(Serialization.scala:98)
Thanks
Yong  

pyspark read json file with high dimensional sparse data

2016-03-30 Thread Yavuz Nuzumlalı
Hi all,

I'm trying to read a data inside a json file using `SQLContext.read.json()`
method.

However, reading operation does not finish. My data is of 29x3100
dimensions, but it's actually really sparse, so if there is a way to
directly read json into a sparse dataframe, it would work perfect for me.

What are the alternatives for reading such data into spark?

P.S. : When I try to load first 5 rows, read operation is completed in
~2 minutes.


Re: Unable to set cores while submitting Spark job

2016-03-30 Thread Mich Talebzadeh
Hi Ted

Can specify the core as follows for example 12 cores?:

  val conf = new SparkConf().
   setAppName("ImportStat").

*setMaster("local[12]").*
set("spark.driver.allowMultipleContexts", "true").
   set("spark.hadoop.validateOutputSpecs", "false")
  val sc = new SparkContext(conf)


Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 30 March 2016 at 14:59, Ted Yu  wrote:

> -c CORES, --cores CORES Total CPU cores to allow Spark applications to
> use on the machine (default: all available); only on worker
>
> bq. sc.getConf().set()
>
> I think you should use this pattern (shown in
> https://spark.apache.org/docs/latest/spark-standalone.html):
>
> val conf = new SparkConf()
>  .setMaster(...)
>  .setAppName(...)
>  .set("spark.cores.max", "1")val sc = new SparkContext(conf)
>
>
> On Wed, Mar 30, 2016 at 5:46 AM, vetal king  wrote:
>
>> Hi all,
>>
>> While submitting Spark Job I am am specifying options --executor-cores 1
>> and --driver-cores 1. However, when the job was submitted, the job used all
>> available cores. So I tried to limit the cores within my main function
>> sc.getConf().set("spark.cores.max", "1"); however it still used all
>> available cores
>>
>> I am using Spark in standalone mode (spark://:7077)
>>
>> Any idea what I am missing?
>> Thanks in Advance,
>>
>> Shridhar
>>
>>
>


Re: Unable to Limit UI to localhost interface

2016-03-30 Thread Michael Segel
It sounds like when you start up spark, its using 0.0.0.0 which means it will 
listen on all interfaces.  
You should be able to limit which interface to use.  

The weird thing is that if you are specifying the IP Address and Port, Spark 
shouldn’t be listening on all of the interfaces for that port. 
(This could be a UI bug? ) 

The other issue… you need to put a firewall in front of your cluster/machine. 
This is probably a best practice issue. 



> On Mar 30, 2016, at 12:25 AM, Akhil Das  wrote:
> 
> In your case, you will be able to see the webui (unless restricted with 
> iptables) but you won't be able to submit jobs to that machine from a remote 
> machine since the spark master is spark://127.0.0.1:7077 
> 
> 
> Thanks
> Best Regards
> 
> On Tue, Mar 29, 2016 at 8:12 PM, David O'Gwynn  > wrote:
> /etc/hosts
> 
> 127.0.0.1 localhost
> 
> conf/slaves 
> 127.0.0.1
> 
> 
> On Mon, Mar 28, 2016 at 5:36 PM, Mich Talebzadeh  > wrote:
> in your /etc/hosts what do you have for localhost
> 
> 127.0.0.1 localhost.localdomain localhost
> 
> conf/slave should have one entry in your case
> 
> cat slaves
> # A Spark Worker will be started on each of the machines listed below.
> localhost
> ...
> 
> Dr Mich Talebzadeh
>  
> LinkedIn  
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>  
> 
>  
> http://talebzadehmich.wordpress.com 
>  
> 
> On 28 March 2016 at 15:32, David O'Gwynn  > wrote:
> Greetings to all,
> 
> I've search around the mailing list, but it would seem that (nearly?) 
> everyone has the opposite problem as mine. I made a stab at looking in the 
> source for an answer, but I figured I might as well see if anyone else has 
> run into the same problem as I.
> 
> I'm trying to limit my Master/Worker UI to run only on localhost. As it 
> stands, I have the following two environment variables set in my spark-env.sh:
> 
> SPARK_LOCAL_IP=127.0.0.1
> SPARK_MASTER_IP=127.0.0.1
> 
> and my slaves file contains one line: 127.0.0.1
> 
> The problem is that when I run "start-all.sh", I can nmap my box's public 
> interface and get the following:
> 
> PORT STATE SERVICE
> 22/tcp   open  ssh
> 8080/tcp open  http-proxy
> 8081/tcp open  blackice-icecap
> 
> Furthermore, I can go to my box's public IP at port 8080 in my browser and 
> get the master node's UI. The UI even reports that the URL/REST URLs to be 
> 127.0.0.1 :
> 
> Spark Master at spark://127.0.0.1:7077 
> URL: spark://127.0.0.1:7077 
> REST URL: spark://127.0.0.1:6066  (cluster mode)
> 
> I'd rather not have spark available in any way to the outside world without 
> an explicit SSH tunnel.
> 
> There are variables to do with setting the Web UI port, but I'm not concerned 
> with the port, only the network interface to which the Web UI binds.
> 
> Any help would be greatly appreciated.
> 
> 
> 
> 



Re: Plot DataFrame with matplotlib

2016-03-30 Thread Yavuz Nuzumlalı
Hi Teng,

Thanks for the answer. I've switched to pandas during proof of concept
process in order to be able to plot graphs easily.

Actually, pandas DataFrame object itself has `plot` methods, so these
objects can plot themselves on most cases easily (it uses matplotlib
inside).

I wonder if spark DataFrame API would consider moving in that direction,
because plotting is really important during analysis process, and
converting data frame using `toPandas()` method would fail for data that do
not fit in memory.

Although I'm not much familiar with internals, I would like to help for
anything if team considers adding such a feature.

On Wed, Mar 23, 2016 at 2:16 PM Teng Qiu  wrote:

> e... then this sounds like a feature requirement for matplotlib, you
> need to make matplotlib's APIs support RDD or spark DataFrame object,
> i checked the API of mplot3d
> (
> http://matplotlib.org/mpl_toolkits/mplot3d/tutorial.html#mpl_toolkits.mplot3d.Axes3D.scatter
> ),
> it only supports "array-like" input data.
>
> so yes, to use matplotlib, you need to take the elements out of RDD,
> and send them to plot API as list object.
>
> 2016-03-23 12:20 GMT+01:00 Yavuz Nuzumlalı :
> > Thanks for help, but the example that you referenced gets the values from
> > RDD as list and plots that list.
> >
> > What I am specifically asking was that is there a convenient way to plot
> a
> > DataFrame object directly?(like pandas DataFrame objects)
> >
> >
> > On Wed, Mar 23, 2016 at 11:47 AM Teng Qiu  wrote:
> >>
> >> not sure about 3d plot, but there is a nice example:
> >>
> >>
> https://github.com/zalando/spark-appliance/blob/master/examples/notebooks/PySpark_sklearn_matplotlib.ipynb
> >>
> >> for plotting rdd or dataframe using matplotlib.
> >>
> >> Am Mittwoch, 23. März 2016 schrieb Yavuz Nuzumlalı :
> >> > Hi all,
> >> > I'm trying to plot the result of a simple PCA operation, but couldn't
> >> > find a clear documentation about plotting data frames.
> >> > Here is the output of my data frame:
> >> > ++
> >> > |pca_features|
> >> > ++
> >> > |[-255.4681508918886,2.9340031372956155,-0.5357914079267039] |
> >> > |[-477.03566189308367,-6.170290817861212,-5.280827588464785] |
> >> > |[-163.13388125540507,-4.571443623272966,-1.2349427928939671]|
> >> > |[-53.721252166903255,0.6162589419996329,-0.39569546286098245]   |
> >> > [-27.97717473880869,0.30883567826481106,-0.11159555340377557]   |
> >> > |[-118.27508063853554,1.3484584740407748,-0.8088790388907207]|
> >> > Values of `pca_features` column is DenseVector s created using
> >> > VectorAssembler.
> >> > How can I draw a simple 3d scatter plot from this data frame?
> >> > Thanks
>


Re: Loading multiple packages while starting spark-shell

2016-03-30 Thread Mustafa Elbehery
Hi,

This worked out ..

Thanks a lot :)


On Wed, Mar 30, 2016 at 4:37 PM Ted Yu  wrote:

> How did you specify the packages ?
>
> See the following from
> https://spark.apache.org/docs/latest/submitting-applications.html :
>
> Users may also include any other dependencies by supplying a
> comma-delimited list of maven coordinates with --packages.
>
> On Wed, Mar 30, 2016 at 7:15 AM, Mustafa Elbehery <
> elbeherymust...@gmail.com> wrote:
>
>> Hi Folks,
>>
>> I am trying to use two Spark packages while working from the shell ..
>> Unfortunately it accepts only one package as parameter and ignore the
>> second.
>>
>> Any suggestion how to work around this ?
>>
>> Regards.
>>
>
>


Re: Loading multiple packages while starting spark-shell

2016-03-30 Thread Ted Yu
How did you specify the packages ?

See the following from
https://spark.apache.org/docs/latest/submitting-applications.html :

Users may also include any other dependencies by supplying a
comma-delimited list of maven coordinates with --packages.

On Wed, Mar 30, 2016 at 7:15 AM, Mustafa Elbehery  wrote:

> Hi Folks,
>
> I am trying to use two Spark packages while working from the shell ..
> Unfortunately it accepts only one package as parameter and ignore the
> second.
>
> Any suggestion how to work around this ?
>
> Regards.
>


Re: Checkpoint of DStream joined with RDD

2016-03-30 Thread Lubomir Nerad

Hi Ted, all,

do you have any advice regarding my questions in my initial email?

I tried Spark 1.5.2 and 1.6.0 without success. The problem seems to be 
that RDDs use some transient fields which are not restored when they 
are recovered from checkpoint files. In case of some RDD 
implementations it is SparkContext, but it can be also implementation 
specific Configuration object, etc. I see in the sources that in the 
case of DStream recovery, the DStreamGraph takes care of restoring 
StreamingContext in all its DStream-s. But I haven't found any similar 
mechanism for RDDs.


So my question is whether I am doing something wrong or this is a bug 
in Spark? If later, is there some workaround except for implementing a 
custom DStream which will return the same RDD every batch interval and 
joining at DStream level instead of RDD level in transform?


Thanks,
Lubo

On 18.3.2016 18:36, Ted Yu wrote:

This is the line where NPE came from:

if (conf.get(SCAN) != null) {

So Configuration instance was null.

On Fri, Mar 18, 2016 at 9:58 AM, Lubomir Nerad 
> wrote:


The HBase version is 1.0.1.1.

Thanks,
Lubo


On 18.3.2016 17:29, Ted Yu wrote:

I looked at the places in SparkContext.scala where NewHadoopRDD
is constrcuted.
It seems the Configuration object shouldn't be null.

Which hbase release are you using (so that I can see which line
the NPE came from) ?

Thanks

On Fri, Mar 18, 2016 at 8:05 AM, Lubomir Nerad
> wrote:

Hi,

I tried to replicate the example of joining DStream with
lookup RDD from

http://spark.apache.org/docs/latest/streaming-programming-guide.html#transform-operation.
It works fine, but when I enable checkpointing for the
StreamingContext and let the application to recover from a
previously created checkpoint, I always get an exception
during start and the whole application fails. I tried various
types of lookup RDD, but the result is the same.

Exception in the case of HBase RDD is:

Exception in thread "main" java.lang.NullPointerException
at

org.apache.hadoop.hbase.mapreduce.TableInputFormat.setConf(TableInputFormat.java:119)
at
org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:109)
at
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
at
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
at

org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
at
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
at

org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
at
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
at
org.apache.spark.Partitioner$$anonfun$2.apply(Partitioner.scala:58)
at
org.apache.spark.Partitioner$$anonfun$2.apply(Partitioner.scala:58)
at scala.math.Ordering$$anon$5.compare(Ordering.scala:122)
at
java.util.TimSort.countRunAndMakeAscending(TimSort.java:351)
at java.util.TimSort.sort(TimSort.java:216)
at java.util.Arrays.sort(Arrays.java:1438)
at scala.collection.SeqLike$class.sorted(SeqLike.scala:615)
at scala.collection.AbstractSeq.sorted(Seq.scala:40)
at scala.collection.SeqLike$class.sortBy(SeqLike.scala:594)
at scala.collection.AbstractSeq.sortBy(Seq.scala:40)
at
org.apache.spark.Partitioner$.defaultPartitioner(Partitioner.scala:58)
at

org.apache.spark.rdd.PairRDDFunctions$$anonfun$join$2.apply(PairRDDFunctions.scala:651)
at

org.apache.spark.rdd.PairRDDFunctions$$anonfun$join$2.apply(PairRDDFunctions.scala:651)
at

org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at

org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
at
org.apache.spark.rdd.PairRDDFunctions.join(PairRDDFunctions.scala:650)
at

Does Spark CSV accept a CSV String

2016-03-30 Thread Benjamin Kim
I have a quick question. I have downloaded multiple zipped files from S3 and 
unzipped each one of them into strings. The next step is to parse using a CSV 
parser. I want to know if there is a way to easily use the spark csv package 
for this?

Thanks,
Ben
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Loading multiple packages while starting spark-shell

2016-03-30 Thread Mustafa Elbehery
Hi Folks,

I am trying to use two Spark packages while working from the shell ..
Unfortunately it accepts only one package as parameter and ignore the
second.

Any suggestion how to work around this ?

Regards.


Re: spark 1.5.2 - value filterByRange is not a member of org.apache.spark.rdd.RDD[(myKey, myData)]

2016-03-30 Thread Ted Yu
Have you tried the following construct ?

new OrderedRDDFunctions[K, V, (K, V)](rdd).sortByKey()

See core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala

On Wed, Mar 30, 2016 at 5:20 AM, Nirav Patel  wrote:

> Hi, I am trying to use filterByRange feature of spark OrderedRDDFunctions
> in a hope that it will speed up filtering by scanning only required
> partitions.
> I have created Paired RDD with a RangePartitioner in one scala class and
> in another class I am trying to access this RDD and do following:
>
> In first scala class called RDDInitializer  I do:
>
>  implicit val rowKeyOrdering = rowKeyOrd
>
> val repartitionRdd = rowdataRdd.partitionBy(new RangePartitioner(
> minPartitions.toInt, dataRdd, true))
>
> dataRdd  = repartitionRdd.sortByKey()
>
>
> In second scala class I do:
>
> import org.apache.spark.SparkContext._
>
> RDDInitializer.dataRdd.filterByRange(myKeyFirst, myKeyLast)
> But I am getting following compile error:
>
> "value filterByRange is not a member of org.apache.spark.rdd.RDD[(myKey,
> myData)]"
>
>
> Looks like I can use all methods of OrderedRDDFunctions inside first
> scala class where implicit rowKeyOrdering is defined but not in second
> class.
>
>
> Please help me resolve this compile error.
>
>
> Thanks
>
> Nirav
>
>
>
>
>
> [image: What's New with Xactly] 
>
>   [image: LinkedIn]
>   [image: Twitter]
>   [image: Facebook]
>   [image: YouTube]
> 


Re: Unable to set cores while submitting Spark job

2016-03-30 Thread Ted Yu
-c CORES, --cores CORES Total CPU cores to allow Spark applications to use
on the machine (default: all available); only on worker

bq. sc.getConf().set()

I think you should use this pattern (shown in
https://spark.apache.org/docs/latest/spark-standalone.html):

val conf = new SparkConf()
 .setMaster(...)
 .setAppName(...)
 .set("spark.cores.max", "1")val sc = new SparkContext(conf)


On Wed, Mar 30, 2016 at 5:46 AM, vetal king  wrote:

> Hi all,
>
> While submitting Spark Job I am am specifying options --executor-cores 1
> and --driver-cores 1. However, when the job was submitted, the job used all
> available cores. So I tried to limit the cores within my main function
> sc.getConf().set("spark.cores.max", "1"); however it still used all
> available cores
>
> I am using Spark in standalone mode (spark://:7077)
>
> Any idea what I am missing?
> Thanks in Advance,
>
> Shridhar
>
>


Re: Configuring log4j Spark

2016-03-30 Thread Guillermo Ortiz
I changed the place of --files and works.

 ( IT DOESN'T WORK)
spark-submit  --conf spark.metrics.conf=metrics.properties --name
"myProject" --master yarn-cluster --class myCompany.spark.MyClass *--files
/opt/myProject/conf/log4j.properties* --jars $SPARK_CLASSPATH
--executor-memory 1024m --num-executors 5  --executor-cores 1
--driver-memory 1024m  /opt/myProject/myJar.jar

(IT WORKS)
spark-submit  --conf spark.metrics.conf=metrics.properties --name
"myProject" --master yarn-cluster --class myCompany.spark.MyClass  --jars
$SPARK_CLASSPATH --executor-memory 1024m --num-executors 5
 --executor-cores 1 --driver-memory 1024m *--files
/opt/myProject/conf/log4j.properties*  /opt/myProject/myJar.jar

I think I didn't do any others changes.



2016-03-30 15:42 GMT+02:00 Guillermo Ortiz :

> I'm trying to configure log4j in Spark.
>
> spark-submit  --conf spark.metrics.conf=metrics.properties --name
> "myProject" --master yarn-cluster --class myCompany.spark.MyClass *--files
> /opt/myProject/conf/log4j.properties* --jars $SPARK_CLASSPATH
> --executor-memory 1024m --num-executors 5  --executor-cores 1
> --driver-memory 1024m  /opt/myProject/myJar.jar
>
> I have this log4j.properties
> log4j.rootCategory=DEBUG, RollingAppender, myConsoleAppender
> #log4j.logger.mycompany.spark=DEBUG
> log4j.category.myCompany.spark=DEBUG
> spark.log.dir=/opt/myProject/log
> spark.log.file=spark.log
>
> log4j.appender.myConsoleAppender=org.apache.log4j.ConsoleAppender
> log4j.appender.myConsoleAppender.layout=org.apache.log4j.PatternLayout
> log4j.appender.myConsoleAppender.Target=System.out
> log4j.appender.myConsoleAppender.layout.ConversionPattern=%d [%t] %-5p %c
> - %m%n
>
> log4j.appender.RollingAppender=org.apache.log4j.DailyRollingFileAppender
> log4j.appender.RollingAppender.MaxFileSize=50MB
> log4j.appender.RollingAppender.MaxBackupIndex=5
> log4j.appender.RollingAppender.layout.ConversionPattern=%d{dd MMM 
> HH:mm:ss,SSS} %-5p [%t] (%C.%M:%L) %x - %m%n
> log4j.appender.RollingAppender.File=${spark.log.dir}/${spark.log.file}
> log4j.appender.RollingAppender.layout=org.apache.log4j.PatternLayout
> log4j.appender.RollingAppender.layout.ConversionPattern=[%p] %d %c %M -
> %m%n
>
> With this I see the log driver with DEBUG level, but the executors with
> INFO level. Why can't I see the executor logs in INFO level?
> I'm using Spark 1.5.0
>
>
>


Configuring log4j Spark

2016-03-30 Thread Guillermo Ortiz
I'm trying to configure log4j in Spark.

spark-submit  --conf spark.metrics.conf=metrics.properties --name
"myProject" --master yarn-cluster --class myCompany.spark.MyClass *--files
/opt/myProject/conf/log4j.properties* --jars $SPARK_CLASSPATH
--executor-memory 1024m --num-executors 5  --executor-cores 1
--driver-memory 1024m  /opt/myProject/myJar.jar

I have this log4j.properties
log4j.rootCategory=DEBUG, RollingAppender, myConsoleAppender
#log4j.logger.mycompany.spark=DEBUG
log4j.category.myCompany.spark=DEBUG
spark.log.dir=/opt/myProject/log
spark.log.file=spark.log

log4j.appender.myConsoleAppender=org.apache.log4j.ConsoleAppender
log4j.appender.myConsoleAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.myConsoleAppender.Target=System.out
log4j.appender.myConsoleAppender.layout.ConversionPattern=%d [%t] %-5p %c -
%m%n

log4j.appender.RollingAppender=org.apache.log4j.DailyRollingFileAppender
log4j.appender.RollingAppender.MaxFileSize=50MB
log4j.appender.RollingAppender.MaxBackupIndex=5
log4j.appender.RollingAppender.layout.ConversionPattern=%d{dd MMM 
HH:mm:ss,SSS} %-5p [%t] (%C.%M:%L) %x - %m%n
log4j.appender.RollingAppender.File=${spark.log.dir}/${spark.log.file}
log4j.appender.RollingAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.RollingAppender.layout.ConversionPattern=[%p] %d %c %M - %m%n

With this I see the log driver with DEBUG level, but the executors with
INFO level. Why can't I see the executor logs in INFO level?
I'm using Spark 1.5.0


Re: Cached Parquet file paths problem

2016-03-30 Thread psmolinski
bumping up the topic.

For the moment I stay with 1.5.2, but I would like to switch to 1.6.x and
this issue is a blocker.

Thanks,
Piotr



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Cached-Parquet-file-paths-problem-tp26576p26637.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Unable to set cores while submitting Spark job

2016-03-30 Thread vetal king
Hi all,

While submitting Spark Job I am am specifying options --executor-cores 1
and --driver-cores 1. However, when the job was submitted, the job used all
available cores. So I tried to limit the cores within my main function
sc.getConf().set("spark.cores.max", "1"); however it still used all
available cores

I am using Spark in standalone mode (spark://:7077)

Any idea what I am missing?
Thanks in Advance,

Shridhar


Re: Unable to Limit UI to localhost interface

2016-03-30 Thread David O'Gwynn
Thanks much, Akhil. iptables is certainly a bandaid, but from an OpSec
perspective, it's troubling.

Is there any way to limit which interfaces the WebUI listens on? Is there a
Jetty configuration that I'm missing?

Thanks again for your help,
David

On Wed, Mar 30, 2016 at 2:25 AM, Akhil Das 
wrote:

> In your case, you will be able to see the webui (unless restricted with
> iptables) but you won't be able to submit jobs to that machine from a
> remote machine since the spark master is spark://127.0.0.1:7077
>
> Thanks
> Best Regards
>
> On Tue, Mar 29, 2016 at 8:12 PM, David O'Gwynn  wrote:
>
>> /etc/hosts
>>
>> 127.0.0.1 localhost
>>
>> conf/slaves
>> 127.0.0.1
>>
>>
>> On Mon, Mar 28, 2016 at 5:36 PM, Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> in your /etc/hosts what do you have for localhost
>>>
>>> 127.0.0.1 localhost.localdomain localhost
>>>
>>> conf/slave should have one entry in your case
>>>
>>> cat slaves
>>> # A Spark Worker will be started on each of the machines listed below.
>>> localhost
>>> ...
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * 
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> *
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>>
>>> On 28 March 2016 at 15:32, David O'Gwynn  wrote:
>>>
 Greetings to all,

 I've search around the mailing list, but it would seem that (nearly?)
 everyone has the opposite problem as mine. I made a stab at looking in the
 source for an answer, but I figured I might as well see if anyone else has
 run into the same problem as I.

 I'm trying to limit my Master/Worker UI to run only on localhost. As it
 stands, I have the following two environment variables set in my
 spark-env.sh:

 SPARK_LOCAL_IP=127.0.0.1
 SPARK_MASTER_IP=127.0.0.1

 and my slaves file contains one line: 127.0.0.1

 The problem is that when I run "start-all.sh", I can nmap my box's
 public interface and get the following:

 PORT STATE SERVICE
 22/tcp   open  ssh
 8080/tcp open  http-proxy
 8081/tcp open  blackice-icecap

 Furthermore, I can go to my box's public IP at port 8080 in my browser
 and get the master node's UI. The UI even reports that the URL/REST URLs to
 be 127.0.0.1:

 Spark Master at spark://127.0.0.1:7077
 URL: spark://127.0.0.1:7077
 REST URL: spark://127.0.0.1:6066 (cluster mode)

 I'd rather not have spark available in any way to the outside world
 without an explicit SSH tunnel.

 There are variables to do with setting the Web UI port, but I'm not
 concerned with the port, only the network interface to which the Web UI
 binds.

 Any help would be greatly appreciated.


>>>
>>
>


spark 1.5.2 - value filterByRange is not a member of org.apache.spark.rdd.RDD[(myKey, myData)]

2016-03-30 Thread Nirav Patel
Hi, I am trying to use filterByRange feature of spark OrderedRDDFunctions
in a hope that it will speed up filtering by scanning only required
partitions.
I have created Paired RDD with a RangePartitioner in one scala class and in
another class I am trying to access this RDD and do following:

In first scala class called RDDInitializer  I do:

 implicit val rowKeyOrdering = rowKeyOrd

val repartitionRdd = rowdataRdd.partitionBy(new RangePartitioner(
minPartitions.toInt, dataRdd, true))

dataRdd  = repartitionRdd.sortByKey()


In second scala class I do:

import org.apache.spark.SparkContext._

RDDInitializer.dataRdd.filterByRange(myKeyFirst, myKeyLast)
But I am getting following compile error:

"value filterByRange is not a member of org.apache.spark.rdd.RDD[(myKey,
myData)]"


Looks like I can use all methods of OrderedRDDFunctions inside first scala
class where implicit rowKeyOrdering is defined but not in second class.


Please help me resolve this compile error.


Thanks

Nirav

-- 


[image: What's New with Xactly] 

  [image: LinkedIn] 
  [image: Twitter] 
  [image: Facebook] 
  [image: YouTube] 



Trouble facing with Timestamp column in SparkR when loading CSV file from S3

2016-03-30 Thread ps30
 I am loading CSV file from S3 bucket in RStudio which is running on EC2
cluster into Spark Data Frame using read.df() function. One of the columns
is the Timestamp column. The Timestamp column is being loaded as String.

When I try to convert the Data type to Timestamp using Cast function, all
the fields are becoming Null. My ultimate aim is to find difference of
Timestamp values of consecutive rows in seconds but I wish to achieve by
working on Spark Data Frame and not R Data Frame as it is comparatively
slow. Need help for the same !




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Trouble-facing-with-Timestamp-column-in-SparkR-when-loading-CSV-file-from-S3-tp26636.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Streaming UI duration numbers mismatch

2016-03-30 Thread Jatin Kumar
Hello Jean,

Were you able to reproduce the problem? I couldn't find any documentation
if the two numbers have different meaning.

--
Thanks
Jatin

On Thu, Mar 24, 2016 at 1:43 AM, Jean-Baptiste Onofré 
wrote:

> Hi Jatin,
>
> I will reproduce tomorrow and take a look.
>
> Did you already create a Jira about that (I don't think so) ? If I
> reproduce the problem (and it's really a problem), then I will create one
> for you.
>
> Thanks,
> Regards
> JB
>
> On 03/23/2016 08:20 PM, Jatin Kumar wrote:
>
>> Hello,
>>
>> Can someone please provide some help on the below issue?
>>
>> --
>> Thanks
>> Jatin
>>
>> On Tue, Mar 22, 2016 at 3:30 PM, Jatin Kumar > > wrote:
>>
>> Hello all,
>>
>> I am running spark streaming application and the duration numbers on
>> batch page and job page don't match. Please find attached
>> screenshots of the same.
>>
>> IMO processing time on batch page at top should be sum of durations
>> of all jobs and similarly the duration of a job reported on batch
>> page should be sum of durations of stages of that job.
>>
>> --
>> Thanks
>> Jatin
>>
>>
>>
> --
> Jean-Baptiste Onofré
> jbono...@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: aggregateByKey on PairRDD

2016-03-30 Thread Daniel Haviv
Hi,
shouldn't groupByKey be avoided (
https://databricks.gitbooks.io/databricks-spark-knowledge-base/content/best_practices/prefer_reducebykey_over_groupbykey.html)
?


Thank you,.
Daniel

On Wed, Mar 30, 2016 at 9:01 AM, Akhil Das 
wrote:

> Isn't it what tempRDD.groupByKey does?
>
> Thanks
> Best Regards
>
> On Wed, Mar 30, 2016 at 7:36 AM, Suniti Singh 
> wrote:
>
>> Hi All,
>>
>> I have an RDD having the data in  the following form :
>>
>> tempRDD: RDD[(String, (String, String))]
>>
>> (brand , (product, key))
>>
>> ("amazon",("book1","tech"))
>>
>> ("eBay",("book1","tech"))
>>
>> ("barns",("book","tech"))
>>
>> ("amazon",("book2","tech"))
>>
>>
>> I would like to group the data by Brand and would like to get the result
>> set in the following format :
>>
>> resultSetRDD : RDD[(String, List[(String), (String)]
>>
>> i tried using the aggregateByKey but kind  of not getting how to achieve
>> this. OR is there any other way to achieve this?
>>
>> val resultSetRDD  = tempRDD.aggregateByKey("")({case (aggr , value) =>
>> aggr + String.valueOf(value) + ","}, (aggr1, aggr2) => aggr1 + aggr2)
>>
>> resultSetRDD = (amazon,("book1","tech"),("book2","tech"))
>>
>> Thanks,
>>
>> Suniti
>>
>
>


Checkpoints in Spark

2016-03-30 Thread Guillermo Ortiz
I'm curious about what kind of things are saved in the checkpoints.

I just changed the number of executors when I execute Spark and it didn't
happen until I remove the checkpoint, I guess that if I'm using
log4j.properties and I want to changed I have to remove the checkpoint as
well.

When you need to change your code and don't wan to to lose any data, Is
there any easy way to do this change in the code?


Re: Unit testing framework for Spark Jobs?

2016-03-30 Thread Lars Albertsson
Thanks!

It is on my backlog to write a couple of blog posts on the topic, and
eventually some example code, but I am currently busy with clients.

Thanks for the pointer to Eventually - I was unaware. Fast exit on
exception would be a useful addition, indeed.

Lars Albertsson
Data engineering consultant
www.mapflat.com
+46 70 7687109

On Mon, Mar 28, 2016 at 2:00 PM, Steve Loughran 
wrote:
> this is a good summary -Have you thought of publishing it at the end of a
URL for others to refer to
>
>> On 18 Mar 2016, at 07:05, Lars Albertsson  wrote:
>>
>> I would recommend against writing unit tests for Spark programs, and
>> instead focus on integration tests of jobs or pipelines of several
>> jobs. You can still use a unit test framework to execute them. Perhaps
>> this is what you meant.
>>
>> You can use any of the popular unit test frameworks to drive your
>> tests, e.g. JUnit, Scalatest, Specs2. I prefer Scalatest, since it
>> gives you choice of TDD vs BDD, and it is also well integrated with
>> IntelliJ.
>>
>> I would also recommend against using testing frameworks tied to a
>> processing technology, such as Spark Testing Base. Although it does
>> seem well crafted, and makes it easy to get started with testing,
>> there are drawbacks:
>>
>> 1. I/O routines are not tested. Bundled test frameworks typically do
>> not materialise datasets on storage, but pass them directly in memory.
>> (I have not verified this for Spark Testing Base, but it looks so.)
>> I/O routines are therefore not exercised, and they often hide bugs,
>> e.g. related to serialisation.
>>
>> 2. You create a strong coupling between processing technology and your
>> tests. If you decide to change processing technology (which can happen
>> soon in this fast paced world...), you need to rewrite your tests.
>> Therefore, during a migration process, the tests cannot detect bugs
>> introduced in migration, and help you migrate fast.
>>
>> I recommend that you instead materialise input datasets on local disk,
>> run your Spark job, which writes output datasets to local disk, read
>> output from disk, and verify the results. You can still use Spark
>> routines to read and write input and output datasets. A Spark context
>> is expensive to create, so for speed, I would recommend reusing the
>> Spark context between input generation, running the job, and reading
>> output.
>>
>> This is easy to set up, so you don't need a dedicated framework for
>> it. Just put your common boilerplate in a shared test trait or base
>> class.
>>
>> In the future, when you want to replace your Spark job with something
>> shinier, you can still use the old tests, and only replace the part
>> that runs your job, giving you some protection from regression bugs.
>>
>>
>> Testing Spark Streaming applications is a different beast, and you can
>> probably not reuse much from your batch testing.
>>
>> For testing streaming applications, I recommend that you run your
>> application inside a unit test framework, e.g, Scalatest, and have the
>> test setup create a fixture that includes your input and output
>> components. For example, if your streaming application consumes from
>> Kafka and updates tables in Cassandra, spin up single node instances
>> of Kafka and Cassandra on your local machine, and connect your
>> application to them. Then feed input to a Kafka topic, and wait for
>> the result to appear in Cassandra.
>>
>> With this setup, your application still runs in Scalatest, the tests
>> run without custom setup in maven/sbt/gradle, and you can easily run
>> and debug inside IntelliJ.
>>
>> Docker is suitable for spinning up external components. If you use
>> Kafka, the Docker image spotify/kafka is useful, since it bundles
>> Zookeeper.
>>
>> When waiting for output to appear, don't sleep for a long time and
>> then check, since it will slow down your tests. Instead enter a loop
>> where you poll for the results and sleep for a few milliseconds in
>> between, with a long timeout (~30s) before the test fails with a
>> timeout.
>
> org.scalatest.concurrent.Eventually is your friend there
>
> eventually(stdTimeout, stdInterval) {
> listRestAPIApplications(connector, webUI, true) should
contain(expectedAppId)
> }
>
> It has good exponential backoff, for fast initial success without using
too much CPU later, and is simple to use
>
> If it has weaknesses in my tests, they are
>
> 1. it will retry on all exceptions, rather than assertions. If there's a
bug in the test code then it manifests as a timeout. ( I think I could play
with Suite.anExceptionThatShouldCauseAnAbort()) here.
> 2. it's timeout action is simply to rethrow the fault; I like to exec a
closure to grab more diagnostics
> 3. It doesn't support some fail-fast exception which your code can raise
to indicate that the desired state is never going to be reached, and so the
test should fail fast. Here a new exception and another entry in

Re: Running Spark on Yarn

2016-03-30 Thread Vineet Mishra
RM NM logs traced below,

RM -->

2016-03-30 14:59:15,498 INFO
org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncher:
Setting up container Container: [ContainerId:
container_1459326455972_0004_01_01, NodeId: myhost:60653,
NodeHttpAddress: myhost:8042, Resource: , Priority:
0, Token: Token { kind: ContainerToken, service: 10.20.53.123:60653 }, ]
for AM appattempt_1459326455972_0004_01
2016-03-30 14:59:15,498 INFO
org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncher:
Command to launch container container_1459326455972_0004_01_01 :
{{JAVA_HOME}}/bin/java,-server,-Xmx512m,-Djava.io.tmpdir={{PWD}}/tmp,-Dspark.yarn.app.container.log.dir=,-XX:MaxPermSize=256m,org.apache.spark.deploy.yarn.ExecutorLauncher,--arg,'
10.20.53.123:45379
',--executor-memory,1024m,--executor-cores,1,--properties-file,{{PWD}}/__spark_conf__/__spark_conf__.properties,1>,/stdout,2>,/stderr
2016-03-30 14:59:15,498 INFO
org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager:
Create AMRMToken for ApplicationAttempt:
appattempt_1459326455972_0004_01
2016-03-30 14:59:15,498 INFO
org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager:
Creating password for appattempt_1459326455972_0004_01
2016-03-30 14:59:15,533 INFO
org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncher: Done
launching container Container: [ContainerId:
container_1459326455972_0004_01_01, NodeId: myhost:60653,
NodeHttpAddress: myhost:8042, Resource: , Priority:
0, Token: Token { kind: ContainerToken, service: 10.20.53.123:60653 }, ]
for AM appattempt_1459326455972_0004_01
2016-03-30 14:59:15,533 INFO
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl:
appattempt_1459326455972_0004_01 State change from ALLOCATED to LAUNCHED
2016-03-30 14:59:16,437 INFO
org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl:
container_1459326455972_0004_01_01 Container Transitioned from ACQUIRED
to RUNNING
2016-03-30 14:59:28,514 INFO SecurityLogger.org.apache.hadoop.ipc.Server:
Auth successful for appattempt_1459326455972_0004_01 (auth:SIMPLE)
2016-03-30 14:59:28,527 INFO
org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService: AM
registration appattempt_1459326455972_0004_01
2016-03-30 14:59:28,527 INFO
org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger: USER=myhost
IP=10.20.53.123 OPERATION=Register App Master
TARGET=ApplicationMasterService RESULT=SUCCESS
APPID=application_1459326455972_0004
APPATTEMPTID=appattempt_1459326455972_0004_01
2016-03-30 14:59:28,527 INFO
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl:
appattempt_1459326455972_0004_01 State change from LAUNCHED to RUNNING
2016-03-30 14:59:28,528 INFO
org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl:
application_1459326455972_0004 State change from ACCEPTED to RUNNING
2016-03-30 14:59:29,456 INFO
org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl:
container_1459326455972_0004_01_02 Container Transitioned from NEW to
ALLOCATED
2016-03-30 14:59:29,457 INFO
org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger:
USER=myhost OPERATION=AM
Allocated Container TARGET=SchedulerApp RESULT=SUCCESS
APPID=application_1459326455972_0004
CONTAINERID=container_1459326455972_0004_01_02
2016-03-30 14:59:29,457 INFO
org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode:
Assigned container container_1459326455972_0004_01_02 of capacity
 on host myhost:60653, which has 2 containers,
 used and  available after
allocation
2016-03-30 14:59:30,121 INFO
org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM:
Sending NMToken for nodeId : myhost:60653 for container :
container_1459326455972_0004_01_02
2016-03-30 14:59:30,122 INFO
org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl:
container_1459326455972_0004_01_02 Container Transitioned from
ALLOCATED to ACQUIRED
2016-03-30 14:59:30,458 INFO
org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSAppAttempt:
Making reservation: node=myhost app_id=application_1459326455972_0004
2016-03-30 14:59:30,458 INFO
org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl:
container_1459326455972_0004_01_03 Container Transitioned from NEW to
RESERVED
2016-03-30 14:59:30,458 INFO
org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSSchedulerNode:
Reserved container container_1459326455972_0004_01_03 on node host:
myhost:60653 #containers=2 available=1468 used=2560 for application
org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSAppAttempt@57cf4903
2016-03-30 14:59:31,460 INFO
org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl:
container_1459326455972_0004_01_02 

Re: Null pointer exception when using com.databricks.spark.csv

2016-03-30 Thread Steve Loughran

On 30 Mar 2016, at 04:44, Selvam Raman 
> wrote:

Hi,

i am using spark 1.6.0 prebuilt hadoop 2.6.0 version in my windows machine.

i was trying to use databricks csv format to read csv file. i used the below 
command.



I got null pointer exception. Any help would be greatly appreciated.



issue is Hadoop and it's need on windows for some native libs, compounded by 
bad error handlng of the situation

In Hadoop 2.8+ the NPE is replaced by a warning and a link to a page telling 
you what to do

https://wiki.apache.org/hadoop/WindowsProblems



Re: SparkML RandomForest java.lang.StackOverflowError

2016-03-30 Thread Eugene Morozov
One more thing.

With increased stack size it completed twice more already, but now I see in
the log.

[dispatcher-event-loop-1] WARN  o.a.spark.scheduler.TaskSetManager - Stage
24860 contains a task of very large size (157 KB). The maximum recommended
task size is 100 KB.

Size of the task increases over time.
When the warning appeared first time it was around 100KB.

Also time to complete collectAsMap at DecisionTree.scala:651 also increased
from 8 seconds at the beginning of the training up to 20-24 seconds now.

--
Be well!
Jean Morozov

On Wed, Mar 30, 2016 at 12:14 AM, Eugene Morozov  wrote:

> Joseph,
>
> I'm using 1.6.0.
>
> --
> Be well!
> Jean Morozov
>
> On Tue, Mar 29, 2016 at 10:09 PM, Joseph Bradley 
> wrote:
>
>> First thought: 70K features is *a lot* for the MLlib implementation (and
>> any PLANET-like implementation)
>>
>> Using fewer partitions is a good idea.
>>
>> Which Spark version was this on?
>>
>> On Tue, Mar 29, 2016 at 5:21 AM, Eugene Morozov <
>> evgeny.a.moro...@gmail.com> wrote:
>>
>>> The questions I have in mind:
>>>
>>> Is it smth that the one might expect? From the stack trace itself it's
>>> not clear where does it come from.
>>> Is it an already known bug? Although I haven't found anything like that.
>>> Is it possible to configure something to workaround / avoid this?
>>>
>>> I'm not sure it's the right thing to do, but I've
>>> increased thread stack size 10 times (to 80MB)
>>> reduced default parallelism 10 times (only 20 cores are available)
>>>
>>> Thank you in advance.
>>>
>>> --
>>> Be well!
>>> Jean Morozov
>>>
>>> On Tue, Mar 29, 2016 at 1:12 PM, Eugene Morozov <
>>> evgeny.a.moro...@gmail.com> wrote:
>>>
 Hi,

 I have a web service that provides rest api to train random forest
 algo.
 I train random forest on a 5 nodes spark cluster with enough memory -
 everything is cached (~22 GB).
 On a small datasets up to 100k samples everything is fine, but with the
 biggest one (400k samples and ~70k features) I'm stuck with
 StackOverflowError.

 Additional options for my web service
 spark.executor.extraJavaOptions="-XX:ThreadStackSize=8192"
 spark.default.parallelism = 200.

 On a 400k samples dataset
 - (with default thread stack size) it took 4 hours of training to get
 the error.
 - with increased stack size it took 60 hours to hit it.
 I can increase it, but it's hard to say what amount of memory it needs
 and it's applied to all of the treads and might waste a lot of memory.

 I'm looking at different stages at event timeline now and see that task
 deserialization time gradually increases. And at the end task
 deserialization time is roughly same as executor computing time.

 Code I use to train model:

 int MAX_BINS = 16;
 int NUM_CLASSES = 0;
 double MIN_INFO_GAIN = 0.0;
 int MAX_MEMORY_IN_MB = 256;
 double SUBSAMPLING_RATE = 1.0;
 boolean USE_NODEID_CACHE = true;
 int CHECKPOINT_INTERVAL = 10;
 int RANDOM_SEED = 12345;

 int NODE_SIZE = 5;
 int maxDepth = 30;
 int numTrees = 50;
 Strategy strategy = new Strategy(Algo.Regression(), Variance.instance(), 
 maxDepth, NUM_CLASSES, MAX_BINS,
 QuantileStrategy.Sort(), new 
 scala.collection.immutable.HashMap<>(), nodeSize, MIN_INFO_GAIN,
 MAX_MEMORY_IN_MB, SUBSAMPLING_RATE, USE_NODEID_CACHE, 
 CHECKPOINT_INTERVAL);
 RandomForestModel model = RandomForest.trainRegressor(labeledPoints.rdd(), 
 strategy, numTrees, "auto", RANDOM_SEED);


 Any advice would be highly appreciated.

 The exception (~3000 lines long):
  java.lang.StackOverflowError
 at
 java.io.ObjectInputStream$PeekInputStream.read(ObjectInputStream.java:2320)
 at
 java.io.ObjectInputStream$PeekInputStream.readFully(ObjectInputStream.java:2333)
 at
 java.io.ObjectInputStream$BlockDataInputStream.readInt(ObjectInputStream.java:2828)
 at
 java.io.ObjectInputStream.readHandle(ObjectInputStream.java:1453)
 at
 java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1512)
 at
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
 at
 java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
 at
 java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
 at
 java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
 at
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
 at
 java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
 at
 java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
 at
 scala.collection.immutable.$colon$colon.readObject(List.scala:366)

Re: Sending events to Kafka from spark job

2016-03-30 Thread أنس الليثي
Dear Andy,

As far as I understand, the transformations are applied to the RDDs not to
the data and I need to send the actual data to Kafka. This way, I think I
should perform at least one action to make spark load the data.

Kindly correct me if I do not understand this the correct way.

Best regards.

On 29 March 2016 at 19:40, Andy Davidson 
wrote:

> Hi Fanoos
>
> I would be careful about using collect(). You need to make sure you local
> computer has enough memory to hold your entire data set.
>
> Eventually I will need to do something similar. I have to written the code
> yet. My plan is to load the data into a data frame and then write a UDF
> that actually publishes the Kafka
>
> If you are using RDD’s you could use map() or some other transform to
> cause the data to be published
>
> Andy
>
> From: fanooos 
> Date: Tuesday, March 29, 2016 at 4:26 AM
> To: "user @spark" 
> Subject: Re: Sending events to Kafka from spark job
>
> I think I find a solution but I have no idea how this affects the execution
> of the application.
>
> At the end of the script I added  a sleep statement.
>
> import time
> time.sleep(1)
>
>
> This solved the problem.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Sending-events-to-Kafka-from-spark-job-tp26622p26624.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>
>


-- 
Anas Rabei
Senior Software Developer
Mubasher.info
anas.ra...@mubasher.info


Re: Spark streaming spilling all the data to disk even if memory available

2016-03-30 Thread Akhil Das
Can you elaborate more on from where you are streaming the data and what
type of consumer you are using etc?

Thanks
Best Regards

On Tue, Mar 29, 2016 at 6:10 PM, Mayur Mohite 
wrote:

> Hi,
>
> We are running spark streaming app on a single machine and we have
> configured spark executor memory to 30G.
> We noticed that after running the app for 12 hours, spark streaming
> started spilling ALL the data to disk even though we have configured
> sufficient memory for spark to use for storage.
>
> -Mayur
>
> Learn more about our inaugural *FirstScreen Conference
> *!
> *Where the worlds of mobile advertising and technology meet!*
>
> June 15, 2016 @ Urania Berlin
>


Re: Unable to Limit UI to localhost interface

2016-03-30 Thread Akhil Das
In your case, you will be able to see the webui (unless restricted with
iptables) but you won't be able to submit jobs to that machine from a
remote machine since the spark master is spark://127.0.0.1:7077

Thanks
Best Regards

On Tue, Mar 29, 2016 at 8:12 PM, David O'Gwynn  wrote:

> /etc/hosts
>
> 127.0.0.1 localhost
>
> conf/slaves
> 127.0.0.1
>
>
> On Mon, Mar 28, 2016 at 5:36 PM, Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>> in your /etc/hosts what do you have for localhost
>>
>> 127.0.0.1 localhost.localdomain localhost
>>
>> conf/slave should have one entry in your case
>>
>> cat slaves
>> # A Spark Worker will be started on each of the machines listed below.
>> localhost
>> ...
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>>
>> On 28 March 2016 at 15:32, David O'Gwynn  wrote:
>>
>>> Greetings to all,
>>>
>>> I've search around the mailing list, but it would seem that (nearly?)
>>> everyone has the opposite problem as mine. I made a stab at looking in the
>>> source for an answer, but I figured I might as well see if anyone else has
>>> run into the same problem as I.
>>>
>>> I'm trying to limit my Master/Worker UI to run only on localhost. As it
>>> stands, I have the following two environment variables set in my
>>> spark-env.sh:
>>>
>>> SPARK_LOCAL_IP=127.0.0.1
>>> SPARK_MASTER_IP=127.0.0.1
>>>
>>> and my slaves file contains one line: 127.0.0.1
>>>
>>> The problem is that when I run "start-all.sh", I can nmap my box's
>>> public interface and get the following:
>>>
>>> PORT STATE SERVICE
>>> 22/tcp   open  ssh
>>> 8080/tcp open  http-proxy
>>> 8081/tcp open  blackice-icecap
>>>
>>> Furthermore, I can go to my box's public IP at port 8080 in my browser
>>> and get the master node's UI. The UI even reports that the URL/REST URLs to
>>> be 127.0.0.1:
>>>
>>> Spark Master at spark://127.0.0.1:7077
>>> URL: spark://127.0.0.1:7077
>>> REST URL: spark://127.0.0.1:6066 (cluster mode)
>>>
>>> I'd rather not have spark available in any way to the outside world
>>> without an explicit SSH tunnel.
>>>
>>> There are variables to do with setting the Web UI port, but I'm not
>>> concerned with the port, only the network interface to which the Web UI
>>> binds.
>>>
>>> Any help would be greatly appreciated.
>>>
>>>
>>
>


Re: Master options Cluster/Client descrepencies.

2016-03-30 Thread Akhil Das
Have a look at
http://spark.apache.org/docs/latest/building-spark.html#building-for-scala-211

Thanks
Best Regards

On Wed, Mar 30, 2016 at 12:09 AM, satyajit vegesna <
satyajit.apas...@gmail.com> wrote:

>
> Hi All,
>
> I have written a spark program on my dev box ,
>IDE:Intellij
>scala version:2.11.7
>spark verison:1.6.1
>
> run fine from IDE, by providing proper input and output paths including
>  master.
>
> But when i try to deploy the code in my cluster made of below,
>
>Spark version:1.6.1
> built from source pkg using scala 2.11
> But when i try spark-shell on cluster i get scala version to be
> 2.10.5
>  hadoop yarn cluster 2.6.0
>
> and with additional options,
>
> --executor-memory
> --total-executor-cores
> --deploy-mode cluster/client
> --master yarn
>
> i get Exception in thread "main" java.lang.NoSuchMethodError:
> scala.Predef$.$conforms()Lscala/Predef$$less$colon$less;
> at com.movoto.SparkPost$.main(SparkPost.scala:36)
> at com.movoto.SparkPost.main(SparkPost.scala)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at
> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
> at
> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
> at
> org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>
> i understand this to be a scala version issue, as i have faced this before.
>
> Is there something that i have change and try  things to get the same
> program running on cluster.
>
> Regards,
> Satyajit.
>
>


Re: aggregateByKey on PairRDD

2016-03-30 Thread Akhil Das
Isn't it what tempRDD.groupByKey does?

Thanks
Best Regards

On Wed, Mar 30, 2016 at 7:36 AM, Suniti Singh 
wrote:

> Hi All,
>
> I have an RDD having the data in  the following form :
>
> tempRDD: RDD[(String, (String, String))]
>
> (brand , (product, key))
>
> ("amazon",("book1","tech"))
>
> ("eBay",("book1","tech"))
>
> ("barns",("book","tech"))
>
> ("amazon",("book2","tech"))
>
>
> I would like to group the data by Brand and would like to get the result
> set in the following format :
>
> resultSetRDD : RDD[(String, List[(String), (String)]
>
> i tried using the aggregateByKey but kind  of not getting how to achieve
> this. OR is there any other way to achieve this?
>
> val resultSetRDD  = tempRDD.aggregateByKey("")({case (aggr , value) =>
> aggr + String.valueOf(value) + ","}, (aggr1, aggr2) => aggr1 + aggr2)
>
> resultSetRDD = (amazon,("book1","tech"),("book2","tech"))
>
> Thanks,
>
> Suniti
>


Re: Null pointer exception when using com.databricks.spark.csv

2016-03-30 Thread Akhil Das
Looks like the winutils.exe is missing from the environment, See
https://issues.apache.org/jira/browse/SPARK-2356

Thanks
Best Regards

On Wed, Mar 30, 2016 at 10:44 AM, Selvam Raman  wrote:

> Hi,
>
> i am using spark 1.6.0 prebuilt hadoop 2.6.0 version in my windows machine.
>
> i was trying to use databricks csv format to read csv file. i used the
> below command.
>
> [image: Inline image 1]
>
> I got null pointer exception. Any help would be greatly appreciated.
>
> [image: Inline image 2]
>
> --
> Selvam Raman
> "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"
>


Re: data frame problem preserving sort order with repartition() and coalesce()

2016-03-30 Thread Takeshi Yamamuro
Hi,

"csvDF = csvDF.sort(orderByColName, ascending=False)" repartitions DF by
using RangePartitioner
(#partitions depends on "spark.sql.shuffle.partitions").
Seems, in your case, some empty partitions were removed, then you got 17
paritions.

// maropu

On Wed, Mar 30, 2016 at 6:49 AM, Andy Davidson <
a...@santacruzintegration.com> wrote:

> I have a requirement to write my results out into a series of CSV files.
> No file may have more than 100 rows of data. In the past my data was not
> sorted, and I was able to use reparation() or coalesce() to ensure the
> file length requirement.
>
> I realize that reparation() cause the data to be shuffled. It appears that
> changes the data ordering. So I sort the repartioned data again.
>
> What is really strange is I no longer get the number of output files I am
> expecting, and the number of lines constraint is not violated
>
> I am using spark-1.6.1
>
> Andy
>
> $ for i in topTags_CSV/*.csv; do wc -l $i; done
>
>   19 topTags_CSV/part-0.csv
>
>   19 topTags_CSV/part-1.csv
>
>   20 topTags_CSV/part-2.csv
>
>   19 topTags_CSV/part-3.csv
>
>   22 topTags_CSV/part-4.csv
>
>   19 topTags_CSV/part-5.csv
>
>   26 topTags_CSV/part-6.csv
>
>   18 topTags_CSV/part-7.csv
>
>   12 topTags_CSV/part-8.csv
>
>   25 topTags_CSV/part-9.csv
>
>   32 topTags_CSV/part-00010.csv
>
>   53 topTags_CSV/part-00011.csv
>
>   89 topTags_CSV/part-00012.csv
>
>  146 topTags_CSV/part-00013.csv
>
>  387 topTags_CSV/part-00014.csv
>
> 2708 topTags_CSV/part-00015.csv
>
>1 topTags_CSV/part-00016.csv
>
> $
>
> numRowsPerCSVFile = 100
>
> numRows = resultDF.count()
>
> quotient, remander = divmod(numRows, numRowsPerCSVFile)
>
> numPartitions = (quotient + 1) if remander > 0 else quotient
>
> ​
>
> debugStr = ("numRows:{0} quotient:{1} remander:{2} repartition({3})"
>
> .format(numRows, quotient, remander, numPartitions))
>
> print(debugStr)
>
> ​
>
> csvDF = resultDF.coalesce(numPartitions)
>
> ​
>
> orderByColName = "count"
>
> csvDF = csvDF.sort(orderByColName, ascending=False)
>
> headerArg = 'true'# if headers else 'false'
>
> csvDF.write.save(outputDir, 'com.databricks.spark.csv', header=headerArg)
>
> renamePartFiles(outputDir)
>
> numRows:3598 quotient:35 remander:98 repartition(36)
>
>
>
>
>


-- 
---
Takeshi Yamamuro


Re: Null pointer exception when using com.databricks.spark.csv

2016-03-30 Thread Selvam Raman
Hi,

i can able to load and extract the data. only problem when i using this
databricks library.

thanks,
selvam R

On Wed, Mar 30, 2016 at 9:33 AM, Hyukjin Kwon  wrote:

> Hi,
>
> I guess this is not a CSV-datasource specific problem.
>
> Does loading any file (eg. textFile()) work as well?
>
> I think this is related with this thread,
> http://apache-spark-user-list.1001560.n3.nabble.com/Error-while-running-example-scala-application-using-spark-submit-td10056.html
> .
>
>
> 2016-03-30 12:44 GMT+09:00 Selvam Raman :
>
>> Hi,
>>
>> i am using spark 1.6.0 prebuilt hadoop 2.6.0 version in my windows
>> machine.
>>
>> i was trying to use databricks csv format to read csv file. i used the
>> below command.
>>
>> [image: Inline image 1]
>>
>> I got null pointer exception. Any help would be greatly appreciated.
>>
>> [image: Inline image 2]
>>
>> --
>> Selvam Raman
>> "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"
>>
>
>


-- 
Selvam Raman
"லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"