Re: Triggering sql on Was S3 via Apache Spark

2018-10-23 Thread Divya Gehlot
Hi Omer ,
Here are couple of the solutions which you can implement for your use case
:
*Option 1 : *
you can mount the S3 bucket as local file system
Here are the details :
https://cloud.netapp.com/blog/amazon-s3-as-a-file-system
*Option 2 :*
 You can use Amazon Glue for your use case
here are the details :
https://aws.amazon.com/blogs/big-data/how-to-access-and-analyze-on-premises-data-stores-using-aws-glue/

*Option 3 :*
Store the file in the local file system and later push it s3 bucket
here are the details
https://stackoverflow.com/questions/48067979/simplest-way-to-fetch-the-file-from-ftp-server-on-prem-put-into-s3-bucket

Thanks,
Divya

On Tue, 23 Oct 2018 at 15:53,  wrote:

> Hi guys,
>
>
>
> We are using Apache Spark on a local machine.
>
>
>
> I need to implement the scenario below.
>
>
>
> In the initial load:
>
>1. CRM application will send a file to a folder. This file contains
>customer information of all customers. This file is in a folder in the
>local server. File name is: customer.tsv
>   1. Customer.tsv contains customerid, country, birty_month,
>   activation_date etc
>2. I need to read the contents of customer.tsv.
>3. I will add current timestamp info to the file.
>4. I will transfer customer.tsv to the S3 bucket: customer.history.data
>
>
>
> In the daily loads:
>
>1.  CRM application will send a new file which contains the
>updated/deleted/inserted customer information.
>
>   File name is daily_customer.tsv
>
>1. Daily_customer.tsv contains contains customerid, cdc_field,
>   country, birty_month, activation_date etc
>
> Cdc field can be New-Customer, Customer-is-Updated, Customer-is-Deleted.
>
>1. I need to read the contents of daily_customer.tsv.
>2. I will add current timestamp info to the file.
>3. I will transfer daily_customer.tsv to the S3 bucket:
>customer.daily.data
>4. I need to merge two buckets customer.history.data and
>customer.daily.data.
>   1. Two buckets have timestamp fields. So I need to query all
>   records whose timestamp is the last timestamp.
>   2. I can use row_number() over(partition by customer_id order by
>   timestamp_field desc) as version_number
>   3. Then I can put the records whose version is one, to the final
>   bucket: customer.dimension.data
>
>
>
> I am running Spark on premise.
>
>- Can I query on AWS S3 buckets by using Spark Sql / Dataframe or RDD
>on a local Spark cluster?
>- Is this approach efficient? Will the queries transfer all historical
>data from AWS S3 to the local cluster?
>- How can I implement this scenario in a more effective way? Like just
>transferring daily data to AWS S3 and then running queries on AWS.
>   - For instance Athena can query on AWS. But it is just a query
>   engine. As I know I can not call it by using an sdk and I can not write 
> the
>   results to a bucket/folder.
>
>
>
> Thanks in advance,
>
> Ömer
>
>
>
>
>
>
>
>
>


Re: Fastest way to drop useless columns

2018-05-31 Thread Divya Gehlot
you can try dropduplicate function

https://github.com/spirom/LearningSpark/blob/master/src/main/scala/dataframe/DropDuplicates.scala

On 31 May 2018 at 16:34,  wrote:

> Hi there !
>
> I have a potentially large dataset ( regarding number of rows and cols )
>
> And I want to find the fastest way to drop some useless cols for me, i.e.
> cols containing only an unique value !
>
> I want to know what do you think that I could do to do this as fast as
> possible using spark.
>
>
> I already have a solution using distinct().count() or approxCountDistinct()
> But, they may not be the best choice as this requires to go through all
> the data, even if the 2 first tested values for a col are already different
> ( and in this case I know that I can keep the col )
>
>
> Thx for your ideas !
>
> Julien
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: [Spark Java] Add new column in DataSet based on existed column

2018-03-28 Thread Divya Gehlot
Hi ,

Here is example snippet in scala

// Convert to a Date typeval timestamp2datetype: (Column) => Column =
(x) => { to_date(x) }df = df.withColumn("date",
timestamp2datetype(col("end_date")))

Hope this helps !

Thanks,

Divya



On 28 March 2018 at 15:16, Junfeng Chen  wrote:

> I am working on adding a date transformed field on existed dataset.
>
> The current dataset contains a column named timestamp in ISO format. I
> want to parse this field to joda time type, and then extract the year,
> month, day, hour info as new column attaching to original dataset.
> I have tried df.withColumn function, but it seems only support simple
> expression rather than customized function like MapFunction.
> How to solve it?
>
> Thanks!
>
>
>
> Regard,
> Junfeng Chen
>


[Error :] RDD TO Dataframe Spark Streaming

2018-01-31 Thread Divya Gehlot
Hi,
I am getting below error when creating Dataframe from twitter Streaming RDD

val sparkSession:SparkSession = SparkSession
.builder
.appName("twittertest2")
.master("local[*]")
.enableHiveSupport()
.getOrCreate()
val sc = sparkSession.sparkContext
val ssc = new StreamingContext(sc, Seconds(2))
val tweets = TwitterUtils.createStream(ssc, None)
val twt = tweets.window(Seconds(60))

case class Tweet(createdAt:Long, text:String)

import org.apache.spark.sql.types._
import sparkSession.implicits._
def row(line: List[String]): Row = Row(line(0).toLong, line(1).toString)

val schema =
  StructType(
StructField("createdAT", LongType, false) ::
  StructField("Text", StringType, true) :: Nil)


 twt.map(status=>
 Tweet(status.getCreatedAt().getTime()/1000, status.getText())
).foreachRDD(rdd=>

rdd.toDF()
)


Error :
Error:(106, 15) value toDF is not a member of
org.apache.spark.rdd.RDD[Tweet]
  rdd.toDF()

So much confusion in Spark 2 regarding the Spark Session :(

Appreciate the help!

Thanks,
Divya


Re: Spark Structured Streaming for Twitter Streaming data

2018-01-31 Thread Divya Gehlot
Got it Thanks for the clarification TD !

On Thu, 1 Feb 2018 at 11:36 AM, Tathagata Das <tathagata.das1...@gmail.com>
wrote:

> The code uses the format "socket" which is only for text sent over a
> simple socket, which is completely different from how Twitter APIs works.
> So this wont work at all.
> Fundamentally, for Structured Streaming, we have focused only on those
> streaming sources that have the capabilities record-level tracking offsets
> (e.g. Kafka offsets) and replayability in order to give strong exactly-once
> fault-tolerance guarantees. Hence we have focused on files, Kafka, Kinesis
> (socket is just for testing as is documented). Twitter APIs as a source
> does not provide those, hence we have not focused on building one. In
> general, for such sources (ones that are not perfectly replayable), there
> are two possible solutions.
>
> 1. Build your own source: A quick google search shows that others in the
> community have attempted to build structured-streaming sources for Twitter.
> It wont provide the same fault-tolerance guarantees as Kafka, etc. However,
> I dont recommend this now because the DataSource APIs to build streaming
> sources are not public yet, and are in flux.
>
> 2. Use Kafka/Kinesis as an intermediate system: Write something simple
> that uses Twitter APIs directly to read tweets and write them into
> Kafka/Kinesis. And then just read from Kafka/Kinesis.
>
> Hope this helps.
>
> TD
>
> On Wed, Jan 31, 2018 at 7:18 PM, Divya Gehlot <divya.htco...@gmail.com>
> wrote:
>
>> Hi ,
>> I see ,Does that means Spark structured streaming doesn't work with
>> Twitter streams ?
>> I could see people used kafka or other streaming tools and used spark to
>> process the data in structured streaming .
>>
>> The below doesn't work directly with Twitter Stream until I set up Kafka
>> ?
>>
>>> import org.apache.spark.sql.SparkSession
>>> val spark = SparkSession
>>>   .builder()
>>>   .appName("Spark SQL basic example")
>>>   .config("spark.some.config.option", "some-value")
>>>   .getOrCreate()
>>> // For implicit conversions like converting RDDs to DataFrames
>>> import spark.implicits
>>>>
>>>> / Read text from socket
>>>
>>> val socketDF = spark
>>>
>>>   .readStream
>>>
>>>   .format("socket")
>>>
>>>   .option("host", "localhost")
>>>
>>>   .option("port", )
>>>
>>>   .load()
>>>
>>>
>>>> socketDF.isStreaming// Returns True for DataFrames that have
>>>> streaming sources
>>>
>>>
>>>> socketDF.printSchema
>>>
>>>
>>>
>>
>>
>> Thanks,
>> Divya
>>
>> On 1 February 2018 at 10:30, Tathagata Das <tathagata.das1...@gmail.com>
>> wrote:
>>
>>> Hello Divya,
>>>
>>> To add further clarification, the Apache Bahir does not have any
>>> Structured Streaming support for Twitter. It only has support for Twitter +
>>> DStreams.
>>>
>>> TD
>>>
>>>
>>>
>>> On Wed, Jan 31, 2018 at 2:44 AM, vermanurag <
>>> anurag.ve...@fnmathlogic.com> wrote:
>>>
>>>> Twitter functionality is not part of Core Spark. We have successfully
>>>> used
>>>> the following packages from maven central in past
>>>>
>>>> org.apache.bahir:spark-streaming-twitter_2.11:2.2.0
>>>>
>>>> Earlier there used to be a twitter package under spark, but I find that
>>>> it
>>>> has not been updated beyond Spark 1.6
>>>> org.apache.spark:spark-streaming-twitter_2.11:1.6.0
>>>>
>>>> Anurag
>>>> www.fnmathlogic.com
>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>>>>
>>>> -
>>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>>
>>>>
>>>
>>
>


Re: Spark Structured Streaming for Twitter Streaming data

2018-01-31 Thread Divya Gehlot
Hi ,
I see ,Does that means Spark structured streaming doesn't work with Twitter
streams ?
I could see people used kafka or other streaming tools and used spark to
process the data in structured streaming .

The below doesn't work directly with Twitter Stream until I set up Kafka  ?

> import org.apache.spark.sql.SparkSession
> val spark = SparkSession
>   .builder()
>   .appName("Spark SQL basic example")
>   .config("spark.some.config.option", "some-value")
>   .getOrCreate()
> // For implicit conversions like converting RDDs to DataFrames
> import spark.implicits
>>
>> / Read text from socket
>
> val socketDF = spark
>
>   .readStream
>
>   .format("socket")
>
>   .option("host", "localhost")
>
>   .option("port", )
>
>   .load()
>
>
>> socketDF.isStreaming// Returns True for DataFrames that have
>> streaming sources
>
>
>> socketDF.printSchema
>
>
>


Thanks,
Divya

On 1 February 2018 at 10:30, Tathagata Das 
wrote:

> Hello Divya,
>
> To add further clarification, the Apache Bahir does not have any
> Structured Streaming support for Twitter. It only has support for Twitter +
> DStreams.
>
> TD
>
>
>
> On Wed, Jan 31, 2018 at 2:44 AM, vermanurag 
> wrote:
>
>> Twitter functionality is not part of Core Spark. We have successfully used
>> the following packages from maven central in past
>>
>> org.apache.bahir:spark-streaming-twitter_2.11:2.2.0
>>
>> Earlier there used to be a twitter package under spark, but I find that it
>> has not been updated beyond Spark 1.6
>> org.apache.spark:spark-streaming-twitter_2.11:1.6.0
>>
>> Anurag
>> www.fnmathlogic.com
>>
>>
>>
>>
>> --
>> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>


Spark Structured Streaming for Twitter Streaming data

2018-01-30 Thread Divya Gehlot
Hi,
I am exploring the spark structured streaming .
When turned to internet to understand about it I could find its more
integrated with Kafka or other streaming tool like Kenesis.
I couldnt find where we can use Spark Streaming API for twitter streaming
data .
Would be grateful ,if any body used it or done some work or can guide me
Pardon me if I had understand it wrongly.

Thanks,
Divya


[Error] Python version mismatch in CDH cluster when running pyspark job

2017-06-16 Thread Divya Gehlot
Hi ,
I have a CDH cluster and running pyspark script in client mode
There are different python version installed in client and worker nodes and
was getting python version mismatch error.
To resolve this issue I followed below cludera document
https://www.cloudera.com/documentation/data-science-workbench/latest/topics/cdsw_troubleshooting.html#workloads__job_fail_python

added below lines
export PYSPARK_PYTHON=/usr/bin/python/
export PYSPARK_DRIVER_PYTHON=python

Still getting the version mismatch error.
Does anybody encounter this issue .
Can you please share how did you resolve it .
Would really appreciate the help.

PS - attaching the screen shot of the code added .

Thanks,
Divya

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Spark job stopping abrubptly

2017-03-07 Thread Divya Gehlot
Hi,
I have spark standalone cluster on AWS EC2 and recently my spark stream
jobs stopping
abrubptly.
When I check the logs I found this

17/03/07 06:09:39 INFO ProtocolStateActor: No response from remote.
Handshake timed out or transport failure detector triggered.
17/03/07 06:09:39 ERROR WorkerWatcher: Lost connection to worker rpc
endpoint akka.tcp://sparkwor...@xxx.xx.xx.xxx:44271/user/Worker.
Exiting.
17/03/07 06:09:39 WARN CoarseGrainedExecutorBackend: An unknown
(xxx.xx.x.xxx:44271) driver disconnected.

17/03/07 06:09:39 INFO DiskBlockManager: Shutdown hook called

My suspect it is due to network connection between AWS instances of spark
cluster.

Could somebody help me put more light on it?


Thanks
Divya


query on Spark Log directory

2017-01-05 Thread Divya Gehlot
Hi ,
I am using EMR machine and I could see the Spark log directory has grown
till 4G.

file name : spark-history-server.out

Need advise how can I reduce the the size of the above mentioned file.

Is there config property which can help me .



Thanks,

Divya


Re: Location for the additional jar files in Spark

2016-12-27 Thread Divya Gehlot
Hi Mich ,

Have  you set SPARK_CLASSPATH in Spark-env.sh ?


Thanks,
Divya

On 27 December 2016 at 17:33, Mich Talebzadeh 
wrote:

> When one runs in Local mode (one JVM) on an edge host (the host user
> accesses the cluster), it is possible to put additional jar file say
> accessing Oracle RDBMS tables in $SPARK_CLASSPATH. This works
>
> export SPARK_CLASSPATH=~/user_jars/ojdbc6.jar
>
> Normally a group of users can have read access to a shared directory like
> above and once they log in their shell will invoke an environment file that
> will have the above classpath plus additional parameters like $JAVA_HOME
> etc are set up for them.
>
> However, if the user chooses to run spark through spark-submit with yarn,
> then the only way this will work in my research is to add the jar file as
> follows on every node of Spark cluster
>
> in $SPARK_HOME/conf/spark-defaults.conf
>
> Add the jar path to the following:
>
> spark.executor.extraClassPath   /user_jars/ojdbc6.jar
>
> Note that setting both spark.executor.extraClassPath and SPARK_CLASSPATH
> will cause initialisation error
>
> ERROR SparkContext: Error initializing SparkContext.
> org.apache.spark.SparkException: Found both spark.executor.extraClassPath
> and SPARK_CLASSPATH. Use only the former.
>
> I was wondering if there are other ways of making this work in YARN mode,
> where every node of cluster will require this JAR file?
>
> Thanks
>


Re: Has anyone managed to connect to Oracle via JDBC from Spark CDH 5.5.2

2016-12-21 Thread Divya Gehlot
Hi Mich,

Can you try placing these jars in Spark Classpath.
It should work .

Thanks,
Divya

On 22 December 2016 at 05:40, Mich Talebzadeh 
wrote:

> This works with Spark 2 with Oracle jar file added to
>
> $SPARK_HOME/conf/ spark-defaults.conf
>
>
>
>
> spark.driver.extraClassPath  /home/hduser/jars/ojdbc6.jar
>
> spark.executor.extraClassPath/home/hduser/jars/ojdbc6.jar
>
>
> and you get
>
>  cala> val s = HiveContext.read.format("jdbc").options(
>  | Map("url" -> _ORACLEserver,
>  | "dbtable" -> "(SELECT to_char(ID) AS ID, to_char(CLUSTERED) AS
> CLUSTERED, to_char(SCATTERED) AS SCATTERED, to_char(RANDOMISED) AS
> RANDOMISED, RANDOM_STRING, SMALL_VC, PADDING FROM scratchpad.dummy)",
>  | "partitionColumn" -> "ID",
>  | "lowerBound" -> "1",
>  | "upperBound" -> "1",
>  | "numPartitions" -> "10",
>  | "user" -> _username,
>  | "password" -> _password)).load
> s: org.apache.spark.sql.DataFrame = [ID: string, CLUSTERED: string ... 5
> more fields]
> that works.
> However, with CDH 5.5.2 (Spark 1.5) it fails with error
>
> *java.sql.SQLException: No suitable driver*
>
>   at java.sql.DriverManager.getDriver(DriverManager.java:315)
>
>   at org.apache.spark.sql.execution.datasources.jdbc.
> JdbcUtils$$anonfun$2.apply(JdbcUtils.scala:54)
>
>   at org.apache.spark.sql.execution.datasources.jdbc.
> JdbcUtils$$anonfun$2.apply(JdbcUtils.scala:54)
>
>   at scala.Option.getOrElse(Option.scala:121)
>
>   at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.
> createConnectionFactory(JdbcUtils.scala:53)
>
>   at org.apache.spark.sql.execution.datasources.jdbc.
> JDBCRDD$.resolveTable(JDBCRDD.scala:123)
>
>   at org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation.(
> JDBCRelation.scala:117)
>
>   at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.
> createRelation(JdbcRelationProvider.scala:53)
>
>   at org.apache.spark.sql.execution.datasources.
> DataSource.resolveRelation(DataSource.scala:315)
>
>   at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:149)
>
>   at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:122)
>
>
> Any ideas?
>
> Thanks
>
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>


Re: How to get recent value in spark dataframe

2016-12-20 Thread Divya Gehlot
https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/spark-sql-windows.html

Hope this helps


Thanks,
Divya

On 15 December 2016 at 12:49, Milin korath 
wrote:

> Hi
>
> I have a spark data frame with following structure
>
>  id  flag price date
>   a   0100  2015
>   a   050   2015
>   a   1200  2014
>   a   1300  2013
>   a   0400  2012
>
> I need to create a data frame with recent value of flag 1 and updated in
> the flag 0 rows.
>
>   id  flag price date new_column
>   a   0100  2015200
>   a   050   2015200
>   a   1200  2014null
>   a   1300  2013null
>   a   0400  2012null
>
> We have 2 rows having flag=0. Consider the first row(flag=0),I will have 2
> values(200 and 300) and I am taking the recent one 200(2014). And the last
> row I don't have any recent value for flag 1 so it is updated with null.
>
> Looking for a solution using scala. Any help would be appreciated.Thanks
>
> Thanks
> Milin
>


Re: What is the deployment model for Spark Streaming? A specific example.

2016-12-17 Thread Divya Gehlot
I am not pyspark person ..
But from the errors I could figure out that your Spark application is
having memory issues .
Are you collecting the results to the driver at any point of time or have
configured less memory for the nodes ?
and If you are using Dataframes then there is  issue raised  in Jira


Hope this helps

Thanks,
Divya

On 16 December 2016 at 16:53, Russell Jurney 
wrote:

> I have created a PySpark Streaming application that uses Spark ML to
> classify flight delays into three categories: on-time, slightly late, very
> late. After an hour or so something times out and the whole thing crashes.
>
> The code and error are on a gist here: https://gist.github.com/rjurney/
> 17d471bc98fd1ec925c37d141017640d
>
> While I am interested in why I am getting an exception, I am more
> interested in understanding what the correct deployment model is... because
> long running processes will have new and varied errors and exceptions.
> Right now with what I've built, Spark is a highly dependable distributed
> system but in streaming mode the entire thing is dependent on one Python
> PID going down. This can't be how apps are deployed in the wild because it
> will never be very reliable, right? But I don't see anything about this in
> the docs, so I am confused.
>
> Note that I use this to run the app, maybe that is the problem?
>
> ssc.start()
> ssc.awaitTermination()
>
>
> What is the actual deployment model for Spark Streaming? All I know to do
> right now is to restart the PID. I'm new to Spark, and the docs don't
> really explain this (that I can see).
>
> Thanks!
> --
> Russell Jurney twitter.com/rjurney russell.jur...@gmail.com relato.io
>


Re: spark reshape hive table and save to parquet

2016-12-14 Thread Divya Gehlot
you can use udfs to do it
http://stackoverflow.com/questions/31615657/how-to-add-a-new-struct-column-to-a-dataframe

Hope it will help.


Thanks,
Divya

On 9 December 2016 at 00:53, Anton Kravchenko 
wrote:

> Hello,
>
> I wonder if there is a way (preferably efficient) in Spark to reshape hive
> table and save it to parquet.
>
> Here is a minimal example, input hive table:
> col1 col2 col3
> 1 2 3
> 4 5 6
>
> output parquet:
> col1 newcol2
> 1 [2 3]
> 4 [5 6]
>
> p.s. The real input hive table has ~1000 columns.
>
> Thank you,
> Anton
>


Re: How many Spark streaming applications can be run at a time on a Spark cluster?

2016-12-14 Thread Divya Gehlot
It depends on the use case ...
Spark always depends on the resource availability .
As long as you have resource to acoomodate ,can run as many spark/spark
streaming  application.


Thanks,
Divya

On 15 December 2016 at 08:42, shyla deshpande 
wrote:

> How many Spark streaming applications can be run at a time on a Spark
> cluster?
>
> Is it better to have 1 spark streaming application to consume all the
> Kafka topics or have multiple streaming applications when possible to keep
> it simple?
>
> Thanks
>
>


Re: Best practice for preprocessing feature with DataFrame

2016-11-16 Thread Divya Gehlot
Hi,

You can use the Column functions provided by Spark API

https://spark.apache.org/docs/1.6.2/api/java/org/apache/spark/sql/functions.html

Hope this helps .

Thanks,
Divya


On 17 November 2016 at 12:08, 颜发才(Yan Facai)  wrote:

> Hi,
> I have a sample, like:
> +---+--++
> |age|gender| city_id|
> +---+--++
> |   | 1|1042015:city_2044...|
> |90s| 2|1042015:city_2035...|
> |80s| 2|1042015:city_2061...|
> +---+--++
>
> and expectation is:
> "age":  90s -> 90, 80s -> 80
> "gender": 1 -> "male", 2 -> "female"
>
> I have two solutions:
> 1. Handle each column separately,  and then join all by index.
> val age = input.select("age").map(...)
> val gender = input.select("gender").map(...)
> val result = ...
>
> 2. Write utf function for each column, and then use in together:
>  val result = input.select(ageUDF($"age"), genderUDF($"gender"))
>
> However, both are awkward,
>
> Does anyone have a better work flow?
> Write some custom Transforms and use pipeline?
>
> Thanks.
>
>
>
>


Re: how to extract arraytype data to file

2016-10-18 Thread Divya Gehlot
http://stackoverflow.com/questions/33864389/how-can-i-create-a-spark-dataframe-from-a-nested-array-of-struct-element

Hope this helps


Thanks,
Divya

On 19 October 2016 at 11:35, lk_spark  wrote:

> hi,all:
> I want to read a json file and search it by sql .
> the data struct should be :
>
> bid: string (nullable = true)
> code: string (nullable = true)
>
> and the json file data should be like :
>  {bid":"MzI4MTI5MzcyNw==","code":"罗甸网警"}
>  {"bid":"MzI3MzQ5Nzc2Nw==","code":"西早君"}
> but in fact my json file data is :
> {"bizs":[ {bid":"MzI4MTI5MzcyNw==","code":"罗甸网警"},{"bid":"
> MzI3MzQ5Nzc2Nw==","code":"西早君"}]}
> {"bizs":[ {bid":"MzI4MTI5Mzcy00==","code":"罗甸网警"},{"bid":"
> MzI3MzQ5Nzc201==","code":"西早君"}]}
> I load it by spark ,data schema shows like this :
>
> root
>  |-- bizs: array (nullable = true)
>  ||-- element: struct (containsNull = true)
>  |||-- bid: string (nullable = true)
>  |||-- code: string (nullable = true)
>
>
> I can select columns by : df.select("bizs.id","bizs.name")
> but the colume values is in array type:
> +++
> |  id|code|
> +++
> |[4938200, 4938201...|[罗甸网警, 室内设计师杨焰红, ...|
> |[4938300, 4938301...|[SDCS十全九美, 旅梦长大, ...|
> |[4938400, 4938401...|[日重重工液压行走回转, 氧老家,...|
> |[4938500, 4938501...|[PABXSLZ, 陈少燕, 笑蜜...|
> |[4938600, 4938601...|[税海微云, 西域美农云家店, 福...|
> +++
>
> what I want is I can read colum in normal row type. how I can do it ?
> 2016-10-19
> --
> lk_spark
>


Re: tutorial for access elements of dataframe columns and column values of a specific rows?

2016-10-18 Thread Divya Gehlot
Can you please elaborate your use case ?

On 18 October 2016 at 15:48, muhammet pakyürek  wrote:

>
>
>
>
>
> --
> *From:* muhammet pakyürek 
> *Sent:* Monday, October 17, 2016 11:51 AM
> *To:* user@spark.apache.org
> *Subject:* rdd and dataframe columns dtype
>
>
> how can i set columns dtype of rdd
>
>
>
>


Re: Is spark a right tool for updating a dataframe repeatedly

2016-10-17 Thread Divya Gehlot
If  my understanding is correct about your query
In spark Dataframes are immutable , cant update the dataframe.
you have to create a new dataframe to update the current dataframe .


Thanks,
Divya


On 17 October 2016 at 09:50, Mungeol Heo  wrote:

> Hello, everyone.
>
> As I mentioned at the tile, I wonder that is spark a right tool for
> updating a data frame repeatedly until there is no more date to
> update.
>
> For example.
>
> while (if there was a updating) {
> update a data frame A
> }
>
> If it is the right tool, then what is the best practice for this kind of
> work?
> Thank you.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: converting hBaseRDD to DataFrame

2016-10-11 Thread Divya Gehlot
Hi Mich ,

you can create dataframe from RDD in below manner also
 val df = sqlContext.createDataFrame(rdd,schema)
val df = sqlContext.createDataFrame(rdd)


The below article also may help you :
http://hortonworks.com/blog/spark-hbase-dataframe-based-hbase-connector/



On 11 October 2016 at 02:02, Mich Talebzadeh 
wrote:

>
> Hi,
>
> I am trying to do some operation on an Hbase table that is being populated
> by Spark Streaming.
>
> Now this is just Spark on Hbase as opposed to Spark on Hive -> view on
> Hbase etc. I also have Phoenix view on this Hbase table.
>
> This is sample code
>
> scala> val tableName = "marketDataHbaseTest"
> > val conf = HBaseConfiguration.create()
> conf: org.apache.hadoop.conf.Configuration = Configuration:
> core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml,
> yarn-default.xml, yarn-site.xml, hdfs-default.xml, hdfs-site.xml,
> hbase-default.xml, hbase-site.xml
> scala> conf.set(TableInputFormat.INPUT_TABLE, tableName)
> scala> //create rdd
> scala>
> *val hBaseRDD = sc.newAPIHadoopRDD(conf,
> classOf[TableInputFormat],classOf[org.apache.hadoop.hbase.io
> .ImmutableBytesWritable],classOf[org.apache.hadoop.hbase.client.Result])*hBaseRDD:
> org.apache.spark.rdd.RDD[(org.apache.hadoop.hbase.io.ImmutableBytesWritable,
> org.apache.hadoop.hbase.client.Result)] = NewHadoopRDD[4] at
> newAPIHadoopRDD at :64
> scala> hBaseRDD.count
> res11: Long = 22272
>
> Now that I have hBaseRDD ,is there anyway I can create a DF on it? I
> understand that it is not as simple as doing toDF on RDD
>
> scala>  hBaseRDD.toDF
> java.lang.AssertionError: assertion failed: no symbol could be loaded from
> interface org.apache.hadoop.hbase.classification.InterfaceAudience$Public
> in object InterfaceAudience with name Public and classloader
> scala.reflect.internal.util.ScalaClassLoader$URLClassLoader@7b44e98e
>
> Thanks
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>


read multiple files

2016-09-27 Thread Divya Gehlot
Hi,
The input data files for my spark job generated at every five minutes file
name follows epoch time convention  as below :

InputFolder/batch-147495960
InputFolder/batch-147495990
InputFolder/batch-147496020
InputFolder/batch-147496050
InputFolder/batch-147496080
InputFolder/batch-147496110
InputFolder/batch-147496140
InputFolder/batch-147496170
InputFolder/batch-147496200
InputFolder/batch-147496230

As per requirement I need to read one month of data from current timestamp.

Would really appreciate if anybody could help me .

Thanks,
Divya


Spark Application Log

2016-09-21 Thread Divya Gehlot
Hi,
I have initialised the logging in my spark App

/*Initialize Logging */
val log = Logger.getLogger(getClass.getName)
Logger.getLogger("org").setLevel(Level.OFF)
Logger.getLogger("akka").setLevel(Level.OFF)

log.warn("Some text"+Somemap.size)


When I run my spark job in using spark-submit like as below

spark-submit \
--master yarn-client \
--driver-memory 1G \
--executor-memory 1G \
--executor-cores 1 \
--num-executors 2 \
--class MainClass /home/hadoop/Spark-assembly-1.0.jar

I could see the log in terminal itself

16/09/22 03:45:31 WARN MainClass$: SomeText  : 10


When I set up this job in scheduler

where I can see these logs?


Thanks,

Divya


Re: write.df is failing on Spark Cluster

2016-09-20 Thread Divya Gehlot
Spark version plz ?

On 21 September 2016 at 09:46, Sankar Mittapally <
sankar.mittapa...@creditvidya.com> wrote:

> Yeah I can do all operations on that folder
>
> On Sep 21, 2016 12:15 AM, "Kevin Mellott" 
> wrote:
>
>> Are you able to manually delete the folder below? I'm wondering if there
>> is some sort of non-Spark factor involved (permissions, etc).
>>
>> /nfspartition/sankar/banking_l1_v2.csv
>>
>> On Tue, Sep 20, 2016 at 12:19 PM, Sankar Mittapally <
>> sankar.mittapa...@creditvidya.com> wrote:
>>
>>> I used that one also
>>>
>>> On Sep 20, 2016 10:44 PM, "Kevin Mellott" 
>>> wrote:
>>>
 Instead of *mode="append"*, try *mode="overwrite"*

 On Tue, Sep 20, 2016 at 11:30 AM, Sankar Mittapally <
 sankar.mittapa...@creditvidya.com> wrote:

> Please find the code below.
>
> sankar2 <- read.df("/nfspartition/sankar/test/2016/08/test.json")
>
> I tried these two commands.
> write.df(sankar2,"/nfspartition/sankar/test/test.csv","csv",
> header="true")
>
> saveDF(sankar2,"sankartest.csv",source="csv",mode="append",schema="true")
>
>
>
> On Tue, Sep 20, 2016 at 9:40 PM, Kevin Mellott <
> kevin.r.mell...@gmail.com> wrote:
>
>> Can you please post the line of code that is doing the df.write
>> command?
>>
>> On Tue, Sep 20, 2016 at 9:29 AM, Sankar Mittapally <
>> sankar.mittapa...@creditvidya.com> wrote:
>>
>>> Hey Kevin,
>>>
>>> It is a empty directory, It is able to write part files to the
>>> directory but while merging those part files we are getting above error.
>>>
>>> Regards
>>>
>>>
>>> On Tue, Sep 20, 2016 at 7:46 PM, Kevin Mellott <
>>> kevin.r.mell...@gmail.com> wrote:
>>>
 Have you checked to see if any files already exist at
 /nfspartition/sankar/banking_l1_v2.csv? If so, you will need to
 delete them before attempting to save your DataFrame to that location.
 Alternatively, you may be able to specify the "mode" setting of the
 df.write operation to "overwrite", depending on the version of Spark 
 you
 are running.

 *ERROR (from log)*
 16/09/17 08:03:28 WARN FileUtil: Failed to delete file or
 dir[/nfspartition/sankar/banking_l1_v2.csv/_temporary/0/task
 _201609170802_0013_m_00/.part-r-0-46a7f178-2490-444e
 -9110-510978eaaecb.csv.crc]:
 it still exists.
 16/09/17 08:03:28 WARN FileUtil: Failed to delete file or
 dir[/nfspartition/sankar/banking_l1_v2.csv/_temporary/0/task
 _201609170802_0013_m_00/part-r-0-46a7f178-2490-444e-
 9110-510978eaaecb.csv]:
 it still exists.

 *df.write Documentation*
 http://spark.apache.org/docs/latest/api/R/write.df.html

 Thanks,
 Kevin

 On Tue, Sep 20, 2016 at 12:16 AM, sankarmittapally <
 sankar.mittapa...@creditvidya.com> wrote:

>  We have setup a spark cluster which is on NFS shared storage,
> there is no
> permission issues with NFS storage, all the users are able to
> write to NFS
> storage. When I fired write.df command in SparkR, I am getting
> below. Can
> some one please help me to fix this issue.
>
>
> 16/09/17 08:03:28 ERROR InsertIntoHadoopFsRelationCommand:
> Aborting job.
> java.io.IOException: Failed to rename DeprecatedRawLocalFileStatus
> {path=file:/nfspartition/sankar/banking_l1_v2.csv/_temporary
> /0/task_201609170802_0013_m_00/part-r-0-46a7f178-249
> 0-444e-9110-510978eaaecb.csv;
> isDirectory=false; length=436486316; replication=1;
> blocksize=33554432;
> modification_time=147409940; access_time=0; owner=; group=;
> permission=rw-rw-rw-; isSymlink=false}
> to
> file:/nfspartition/sankar/banking_l1_v2.csv/part-r-0-46a
> 7f178-2490-444e-9110-510978eaaecb.csv
> at
> org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.m
> ergePaths(FileOutputCommitter.java:371)
> at
> org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.m
> ergePaths(FileOutputCommitter.java:384)
> at
> org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.c
> ommitJob(FileOutputCommitter.java:326)
> at
> org.apache.spark.sql.execution.datasources.BaseWriterContain
> er.commitJob(WriterContainer.scala:222)
> at
> org.apache.spark.sql.execution.datasources.InsertIntoHadoopF
> sRelationCommand$$anonfun$run$1.apply$mcV$sp(InsertIntoHadoo
> pFsRelationCommand.scala:144)
> at
> org.apache.spark.sql.execution.datasources.InsertIntoHadoopF
> 

Re: 1TB shuffle failed with executor lost failure

2016-09-19 Thread Divya Gehlot
The exit code 52 comes from org.apache.spark.util.SparkExitCode, and it is
val OOM=52 - i.e. an OutOfMemoryError
Refer
https://github.com/apache/spark/blob/d6dc12ef0146ae409834c78737c116050961f350/core/src/main/scala/org/apache/spark/util/SparkExitCode.scala



On 19 September 2016 at 14:57, Cyanny LIANG  wrote:

> My job is 1TB join + 10 GB table on spark1.6.1
> run on yarn mode:
>
> *1. if I open shuffle service, the error is *
> Job aborted due to stage failure: ShuffleMapStage 2 (writeToDirectory at
> NativeMethodAccessorImpl.java:-2) has failed the maximum allowable number
> of times: 4. Most recent failure reason: 
> org.apache.spark.shuffle.FetchFailedException:
> java.lang.RuntimeException: Executor is not registered 
> (appId=application_1473819702737_1239,
> execId=52)
> at org.apache.spark.network.shuffle.ExternalShuffleBlockResolver.
> getBlockData(ExternalShuffleBlockResolver.java:105)
> at org.apache.spark.network.shuffle.ExternalShuffleBlockHandler.
> receive(ExternalShuffleBlockHandler.java:74)
> at org.apache.spark.network.server.TransportRequestHandler.
> processRpcRequest(TransportRequestHandler.java:114)
> at org.apache.spark.network.server.TransportRequestHandler.handle(
> TransportRequestHandler.java:87)
> at org.apache.spark.network.server.TransportChannelHandler.
> channelRead0(TransportChannelHandler.java:101)
>
> *2. if I close shuffle service, *
> *set spark.executor.instances 80*
> the error is :
> ExecutorLostFailure (executor 71 exited caused by one of the running
> tasks) Reason: Container marked as failed: 
> container_1473819702737_1432_01_406847560
> on host: nmg01-spark-a0021.nmg01.baidu.com. Exit status: 52. Diagnostics:
> Exception from container-launch: ExitCodeException exitCode=52:
> ExitCodeException exitCode=52:
>
> These errors are reported on shuffle stage
> My data is skew, some ids have 400million rows, but some ids only have
> 1million rows, is anybody has some ideas to solve the problem?
>
>
> *3. My config is *
> Here is my config
> I use tungsten-sort in off-heap mode, in on-heap mode, the oom problem
> will be more serious
>
> spark.driver.cores 4
>
> spark.driver.memory 8g
>
>
> # use on client mode
>
>
> spark.yarn.am.memory 8g
>
>
> spark.yarn.am.cores 4
>
>
> spark.executor.memory 8g
>
>
> spark.executor.cores 4
>
> spark.yarn.executor.memoryOverhead 6144
>
>
> spark.memory.offHeap.enabled true
>
>
> spark.memory.offHeap.size 40
>
> Best & Regards
> Cyanny LIANG
>


how to specify cores and executor to run spark jobs simultaneously

2016-09-14 Thread Divya Gehlot
Hi,

I am on EMR cluster and My cluster configuration is as below:
Number of nodes including master node - 3
Memory:22.50 GB
VCores Total : 16
Active Nodes : 2
Spark version- 1.6.1

Parameter set in spark-default.conf

spark.executor.instances 2
> spark.executor.cores 8
> spark.driver.memory  10473M
> spark.executor.memory9658M
> spark.default.parallelism32


Would let me know if need any other info regarding the cluster .

The current configuration for spark-submit is
--driver-memory 5G \
--executor-memory 2G \
--executor-cores 5 \
--num-executors 10 \


Currently  with the above job configuration if I try to run another spark
job it will be in accepted state till the first one finishes .
How do I optimize or update the above spark-submit configurations to run
some more spark jobs simultaneously

Would really appreciate the help.

Thanks,
Divya


Re: [Erorr:]vieiwng Web UI on EMR cluster

2016-09-13 Thread Divya Gehlot
Hi ,
Thank you all..
Hurray ...I am able to view the hadoop web UI now  @ 8088 . even Spark
Hisroty server Web UI @ 18080
But unable to figure out the Spark UI web port ...
Tried with 4044,4040.. ..
getting below error
This site can’t be reached
How can I find out the Spark port ?

Would really appreciate the help.

Thanks,
Divya


On 13 September 2016 at 15:09, Divya Gehlot <divya.htco...@gmail.com> wrote:

> Hi,
> Thanks all for your prompt response.
> I followed the instruction in the docs EMR SSH tunnel
> <https://docs.aws.amazon.com/ElasticMapReduce/latest/ManagementGuide/emr-ssh-tunnel.html>
> shared by Jonathan.
> I am on MAC and set up foxy proxy in my chrome browser
>
> Divyas-MacBook-Pro:.ssh divyag$ ssh  -N -D 8157
> had...@ec2-xx-xxx-xxx-xx.ap-southeast-1.compute.amazonaws.com
>
> channel 3: open failed: connect failed: Connection refused
>
> channel 3: open failed: connect failed: Connection refused
>
> channel 4: open failed: connect failed: Connection refused
>
> channel 3: open failed: connect failed: Connection refused
>
> channel 4: open failed: connect failed: Connection refused
>
> channel 3: open failed: connect failed: Connection refused
>
> channel 3: open failed: connect failed: Connection refused
>
> channel 4: open failed: connect failed: Connection refused
>
> channel 5: open failed: connect failed: Connection refused
>
> channel 22: open failed: connect failed: Connection refused
>
> channel 23: open failed: connect failed: Connection refused
>
> channel 22: open failed: connect failed: Connection refused
>
> channel 23: open failed: connect failed: Connection refused
>
> channel 22: open failed: connect failed: Connection refused
>
> channel 8: open failed: administratively prohibited: open failed
>
>
> What am I missing now ?
>
>
> Thanks,
>
> Divya
>
> On 13 September 2016 at 14:23, Jonathan Kelly <jonathaka...@gmail.com>
> wrote:
>
>> I would not recommend opening port 50070 on your cluster, as that would
>> give the entire world access to your data on HDFS. Instead, you should
>> follow the instructions found here to create a secure tunnel to the
>> cluster, through which you can proxy requests to the UIs using a browser
>> plugin like FoxyProxy: https://docs.aws.amazon.com/ElasticMapReduce/late
>> st/ManagementGuide/emr-ssh-tunnel.html
>>
>> ~ Jonathan
>>
>> On Mon, Sep 12, 2016 at 10:40 PM Mohammad Tariq <donta...@gmail.com>
>> wrote:
>>
>>> Hi Divya,
>>>
>>> Do you you have inbounds enabled on port 50070 of your NN machine. Also,
>>> it's a good idea to have the public DNS in your /etc/hosts for proper name
>>> resolution.
>>>
>>>
>>> [image: --]
>>>
>>> Tariq, Mohammad
>>> [image: https://]about.me/mti
>>>
>>> <https://about.me/mti?promo=email_sig_source=email_sig_medium=external_link_campaign=chrome_ext>
>>>
>>>
>>>
>>>
>>> [image: http://] <http://about.me/mti>
>>> Tariq, Mohammad
>>> about.me/mti
>>> [image: http://]
>>> <http://about.me/mti>
>>>
>>> On Tue, Sep 13, 2016 at 9:28 AM, Divya Gehlot <divya.htco...@gmail.com>
>>> wrote:
>>>
>>>> Hi,
>>>> I am on EMR 4.7 with Spark 1.6.1   and Hadoop 2.7.2
>>>> When I am trying to view Any of the web UI of the cluster either hadoop
>>>> or Spark ,I am getting below error
>>>> "
>>>> This site can’t be reached
>>>>
>>>> "
>>>> Has anybody using EMR and able to view WebUI .
>>>> Could you please share the steps.
>>>>
>>>> Would really appreciate the help.
>>>>
>>>> Thanks,
>>>> Divya
>>>>
>>>
>>>
>


Ways to check Spark submit running

2016-09-13 Thread Divya Gehlot
Hi,

Some how for time being  I am unable to view Spark Web UI and Hadoop Web UI.
Looking for other ways ,I can check my job is running fine apart from keep
checking current yarn logs .


Thanks,
Divya


[Erorr:]vieiwng Web UI on EMR cluster

2016-09-12 Thread Divya Gehlot
Hi,
I am on EMR 4.7 with Spark 1.6.1   and Hadoop 2.7.2
When I am trying to view Any of the web UI of the cluster either hadoop or
Spark ,I am getting below error
"
This site can’t be reached

"
Has anybody using EMR and able to view WebUI .
Could you please share the steps.

Would really appreciate the help.

Thanks,
Divya


Error while calling udf Spark submit

2016-09-08 Thread Divya Gehlot
Hi,
I am on Spark 1.6.1
I am getting below error when I am trying to call UDF in my spark Dataframe
column
UDF
/* get the train line */
val deriveLineFunc :(String => String) = (str:String) => {
val build_key = str.split(",").toList
val getValue  = if(build_key.length > 1)
MapBrdcst.value.getOrElse((build_key(0),build_key(1)),("no","no")) else
("no","no")
val (value:String,_) = getValue
value
}

Exception in thread "main" org.apache.spark.SparkException: Job aborted due
to stage failure: Task 0 in stage 12.0 failed 4 times, most recent failure:
Lost task 0.3 in stage 12.0 (TID 621,
ip-xxx-xx-xx-xx.ap-southeast-1.compute.internal):
java.lang.ExceptionInInitializerError



Any clue ,what went wrong


Thanks,

Divya


Calling udf in Spark

2016-09-08 Thread Divya Gehlot
Hi,

Is it necessary to import sqlContext.implicits._ whenever define and
call UDF in Spark.


Thanks,
Divya


Getting memory error when starting spark shell but not often

2016-09-06 Thread Divya Gehlot
Hi,
I am using EMR 4.7 with Spark 1.6
Sometimes when I start the spark shell I get below error

OpenJDK 64-Bit Server VM warning: INFO:
> os::commit_memory(0x0005662c, 10632822784, 0) failed; error='Cannot
> allocate memory' (errno=12)
> #
> # There is insufficient memory for the Java Runtime Environment to
> continue.
> # Native memory allocation (malloc) failed to allocate 10632822784 bytes
> for committing reserved memory.
> # An error report file with more information is saved as:
> # /tmp/jvm-6066/hs_error.log



Has any body encountered this kind of issue .
Would really appreciate the resolution.


Thanks,
Divya


Re: [Spark submit] getting error when use properties file parameter in spark submit

2016-09-06 Thread Divya Gehlot
Yes I am reading from s3 bucket ..
Strangely the  error goes off when I remove the properties girl parameter .

On Sep 6, 2016 8:35 PM, "Sonal Goyal" <sonalgoy...@gmail.com> wrote:

> Looks like a classpath issue - Caused by: java.lang.ClassNotFoundException:
> com.amazonaws.services.s3.AmazonS3
>
> Are you using S3 somewhere? Are the required jars in place?
>
> Best Regards,
> Sonal
> Founder, Nube Technologies <http://www.nubetech.co>
> Reifier at Strata Hadoop World
> <https://www.youtube.com/watch?v=eD3LkpPQIgM>
> Reifier at Spark Summit 2015
> <https://spark-summit.org/2015/events/real-time-fuzzy-matching-with-spark-and-elastic-search/>
>
> <http://in.linkedin.com/in/sonalgoyal>
>
>
>
> On Tue, Sep 6, 2016 at 4:45 PM, Divya Gehlot <divya.htco...@gmail.com>
> wrote:
>
>> Hi,
>> I am getting below error if I try to use properties file paramater in
>> spark-submit
>>
>> Exception in thread "main" java.util.ServiceConfigurationError:
>> org.apache.hadoop.fs.FileSystem: Provider 
>> org.apache.hadoop.fs.s3a.S3AFileSystem
>> could not be instantiated
>> at java.util.ServiceLoader.fail(ServiceLoader.java:224)
>> at java.util.ServiceLoader.access$100(ServiceLoader.java:181)
>> at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:377)
>> at java.util.ServiceLoader$1.next(ServiceLoader.java:445)
>> at org.apache.hadoop.fs.FileSystem.loadFileSystems(FileSystem.java:2673)
>> at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSyste
>> m.java:2684)
>> at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2701)
>> at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91)
>> at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem
>> .java:2737)
>> at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2719)
>> at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:375)
>> at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:174)
>> at org.apache.spark.deploy.yarn.ApplicationMaster.run(Applicati
>> onMaster.scala:142)
>> at org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$main
>> $1.apply$mcV$sp(ApplicationMaster.scala:653)
>> at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHad
>> oopUtil.scala:69)
>> at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHad
>> oopUtil.scala:68)
>> at java.security.AccessController.doPrivileged(Native Method)
>> at javax.security.auth.Subject.doAs(Subject.java:415)
>> at org.apache.hadoop.security.UserGroupInformation.doAs(UserGro
>> upInformation.java:1657)
>> at org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(Spark
>> HadoopUtil.scala:68)
>> at org.apache.spark.deploy.yarn.ApplicationMaster$.main(Applica
>> tionMaster.scala:651)
>> at org.apache.spark.deploy.yarn.ApplicationMaster.main(Applicat
>> ionMaster.scala)
>> Caused by: java.lang.NoClassDefFoundError: com/amazonaws/services/s3/Amaz
>> onS3
>> at java.lang.Class.getDeclaredConstructors0(Native Method)
>> at java.lang.Class.privateGetDeclaredConstructors(Class.java:2595)
>> at java.lang.Class.getConstructor0(Class.java:2895)
>> at java.lang.Class.newInstance(Class.java:354)
>> at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:373)
>> ... 19 more
>> Caused by: java.lang.ClassNotFoundException:
>> com.amazonaws.services.s3.AmazonS3
>> at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
>> at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
>> at java.security.AccessController.doPrivileged(Native Method)
>> at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
>> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
>> ... 24 more
>> End of LogType:stderr
>>
>> If I remove the --properties-file parameter
>> the error is gone
>>
>> Would really appreciate the help .
>>
>>
>>
>> Thanks,
>> Divya
>>
>
>


[Spark submit] getting error when use properties file parameter in spark submit

2016-09-06 Thread Divya Gehlot
Hi,
I am getting below error if I try to use properties file paramater in
spark-submit

Exception in thread "main" java.util.ServiceConfigurationError:
org.apache.hadoop.fs.FileSystem: Provider
org.apache.hadoop.fs.s3a.S3AFileSystem could not be instantiated
at java.util.ServiceLoader.fail(ServiceLoader.java:224)
at java.util.ServiceLoader.access$100(ServiceLoader.java:181)
at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:377)
at java.util.ServiceLoader$1.next(ServiceLoader.java:445)
at org.apache.hadoop.fs.FileSystem.loadFileSystems(FileSystem.java:2673)
at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2684)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2701)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2737)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2719)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:375)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:174)
at
org.apache.spark.deploy.yarn.ApplicationMaster.run(ApplicationMaster.scala:142)
at
org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$main$1.apply$mcV$sp(ApplicationMaster.scala:653)
at
org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:69)
at
org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:68)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
at
org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:68)
at
org.apache.spark.deploy.yarn.ApplicationMaster$.main(ApplicationMaster.scala:651)
at
org.apache.spark.deploy.yarn.ApplicationMaster.main(ApplicationMaster.scala)
Caused by: java.lang.NoClassDefFoundError:
com/amazonaws/services/s3/AmazonS3
at java.lang.Class.getDeclaredConstructors0(Native Method)
at java.lang.Class.privateGetDeclaredConstructors(Class.java:2595)
at java.lang.Class.getConstructor0(Class.java:2895)
at java.lang.Class.newInstance(Class.java:354)
at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:373)
... 19 more
Caused by: java.lang.ClassNotFoundException:
com.amazonaws.services.s3.AmazonS3
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
... 24 more
End of LogType:stderr

If I remove the --properties-file parameter
the error is gone

Would really appreciate the help .



Thanks,
Divya


[Spark-Submit:]Error while reading from s3n

2016-09-06 Thread Divya Gehlot
Hi,
I am on EMR 4.7 with Spark 1.6.1
I am trying to read from s3n buckets in spark
Option 1 :
If I set up

hadoopConf.set("fs.s3.impl", "org.apache.hadoop.fs.s3.S3FileSystem")
hadoopConf.set("fs.s3.awsSecretAccessKey", sys.env("AWS_SECRET_ACCESS_KEY"))
hadoopConf.set("fs.s3.awsAccessKeyId", sys.env("AWS_ACCESS_KEY_ID"))


and access the bucket as s3://bucket-name

I am getting below error

Exception in thread "main" java.io.IOException: /batch-147313410
doesn't exist
at 
org.apache.hadoop.fs.s3.Jets3tFileSystemStore.get(Jets3tFileSystemStore.java:170)
at 
org.apache.hadoop.fs.s3.Jets3tFileSystemStore.retrieveINode(Jets3tFileSystemStore.java:221)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
at com.sun.proxy.$Proxy37.retrieveINode(Unknown Source)
at 
org.apache.hadoop.fs.s3.S3FileSystem.getFileStatus(S3FileSystem.java:340)
at org.apache.hadoop.fs.Globber.getFileStatus(Globber.java:57)
at org.apache.hadoop.fs.Globber.glob(Globber.java:252)
at org.apache.hadoop.fs.FileSystem.globStatus(FileSystem.java:1730)
at 
org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:231)
at 
org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:201)
at 
org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:281)
at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:199)

Option 2:

If I set up

hadoopConf.set("fs.s3n.impl",
"org.apache.hadoop.fs.s3native.NativeS3FileSystem")
hadoopConf.set("fs.s3n.awsSecretAccessKey", sys.env("AWS_SECRET_ACCESS_KEY"))
hadoopConf.set("fs.s3n.awsAccessKeyId", sys.env("AWS_ACCESS_KEY_ID"))


and try to access the bucket as s3n://bucket-name

getting the below error :


Caused by: org.apache.hadoop.security.AccessControlException:
Permission denied: s3n://bucket-name/batch-147313710_$folder$
at 
org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.processException(Jets3tNativeFileSystemStore.java:449)
at 
org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.processException(Jets3tNativeFileSystemStore.java:427)
at 
org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.handleException(Jets3tNativeFileSystemStore.java:411)
at 
org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.retrieveMetadata(Jets3tNativeFileSystemStore.java:181)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)




When I try to list the bucket using aws cli

aws s3 ls s3://bucket-name/


It gives me the bucket listing.


Would really appreciate the help.



Thanks,

Divya


Re: difference between package and jar Option in Spark

2016-09-04 Thread Divya Gehlot
Hi,
I am using spark-csv to parse my input files .
If I use --package option it works fine but if I download
<https://mvnrepository.com/artifact/com.databricks/spark-csv_2.10/1.4.0>
the jar and use --jars option
Its throwing Class not found exception.


Thanks,
Divya

On 1 September 2016 at 17:26, Sean Owen <so...@cloudera.com> wrote:

> --jars includes a local JAR file in the application's classpath.
> --package references Maven coordinates of a dependency and retrieves
> and includes all of those JAR files, and includes them in the app
> classpath.
>
> On Thu, Sep 1, 2016 at 10:24 AM, Divya Gehlot <divya.htco...@gmail.com>
> wrote:
> > Hi,
> >
> > Would like to know the difference between the --package and --jars
> option in
> > Spark .
> >
> >
> >
> > Thanks,
> > Divya
>


Re: [Error:]while read s3 buckets in Spark 1.6 in spark -submit

2016-09-02 Thread Divya Gehlot
Hi Steve,
I am trying to read it from S3n://"bucket" and already included aws-java-sdk
1.7.4 in my classpath .
My machine is AWS EMR with HAdoop 2.7.2 and Spark 1.6.1 installed .
As per the below post its shows that issue with EMR Hadoop2.7.2
http://stackoverflow.com/questions/30385981/how-to-access-s3a-files-from-apache-spark
Is it really the issue ?
Could somebody help me validate the above ?


Thanks,
Divya



On 1 September 2016 at 16:59, Steve Loughran <ste...@hortonworks.com> wrote:

>
> On 1 Sep 2016, at 03:45, Divya Gehlot <divya.htco...@gmail.com> wrote:
>
> Hi,
> I am using Spark 1.6.1 in EMR machine
> I am trying to read s3 buckets in my Spark job .
> When I read it through Spark shell I am able to read it ,but when I try to
> package the job and and run it as spark submit I am getting below error
>
> 16/08/31 07:36:38 INFO ApplicationMaster: Registered signal handlers for
> [TERM, HUP, INT]
>
>> 16/08/31 07:36:39 INFO ApplicationMaster: ApplicationAttemptId:
>> appattempt_1468570153734_2851_01
>> Exception in thread "main" java.util.ServiceConfigurationError:
>> org.apache.hadoop.fs.FileSystem: Provider 
>> org.apache.hadoop.fs.s3a.S3AFileSystem
>> could not be instantiated
>> at java.util.ServiceLoader.fail(ServiceLoader.java:224)
>> at java.util.ServiceLoader.access$100(ServiceLoader.java:181)
>> at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:377)
>>
> I have already included
>
>  "com.amazonaws" % "aws-java-sdk-s3" % "1.11.15",
>
> in my build.sbt
>
>
> Assuming you are using a released version of Hadoop 2.6 or 2.7 underneath
> spark, you will need to make sure your classpath has aws-java-sdk 1.7.4 on
> your CP. You can't just drop in a new JAR as it is incompatible at the API
> level ( https://issues.apache.org/jira/browse/HADOOP-12269 )
>
>
> 
>   com.amazonaws
>   aws-java-sdk
>   1.7.4
>   compile
> 
>
>
> and jackson artifacts databind and annotations in sync with the rest of
> your app
>
>
> 
>   com.fasterxml.jackson.core
>   jackson-databind
> 
> 
>   com.fasterxml.jackson.core
>   jackson-annotations
> 
>
>
> I tried the provinding the access key also in my job still the same error
> persists.
>
> when I googled it I if you have IAM role created there is no need to
> provide access key .
>
>
>
> You don't get IAM support until Hadoop 2.8 ships. sorry. Needed a fair
> amount of reworking of how S3A does authentication.
>
> Note that if you launch spark jobs with the AWS environment variables set,
> these will be automatically picked up and used to set the relevant
> properties in the configuration.
>


difference between package and jar Option in Spark

2016-09-01 Thread Divya Gehlot
Hi,

Would like to know the difference between the --package and --jars option
in Spark .



Thanks,
Divya


Re: Window Functions with SQLContext

2016-09-01 Thread Divya Gehlot
Hi Saurabh,

Even I am using Spark 1.6+ version ..and when I didnt create hiveContext it
threw the same error .
So have to  create HiveContext to access windows function

Thanks,
Divya

On 1 September 2016 at 13:16, saurabh3d  wrote:

> Hi All,
>
> As per  SPARK-11001 
>  ,
> Window functions should be supported by SQLContext. But when i try to run
>
> SQLContext sqlContext = new SQLContext(jsc);
> WindowSpec w = Window.partitionBy("assetId").orderBy("assetId");
> DataFrame df_2 = df1.withColumn("row_number", row_number().over(w));
> df_2.show(false);
>
> it fails with:
> Exception in thread "main" org.apache.spark.sql.AnalysisException: Could
> not
> resolve window function 'row_number'. Note that, using window functions
> currently requires a HiveContext;
>
> This code runs fine with HiveContext.
> Any idea what’s going on?  Is this a known issue and is there a workaround
> to make Window function work without HiveContext.
>
> Thanks,
> Saurabh
>
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/Window-Functions-with-SQLContext-tp27636.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Spark build 1.6.2 error

2016-08-31 Thread Divya Gehlot
Which java version are you using ?

On 31 August 2016 at 04:30, Diwakar Dhanuskodi  wrote:

> Hi,
>
> While building Spark 1.6.2 , getting below error in spark-sql. Much
> appreciate for any help.
>
> ERROR] missing or invalid dependency detected while loading class file
> 'WebUI.class'.
> Could not access term eclipse in package org,
> because it (or its dependencies) are missing. Check your build definition
> for
> missing or conflicting dependencies. (Re-run with `-Ylog-classpath` to see
> the problematic classpath.)
> A full rebuild may help if 'WebUI.class' was compiled against an
> incompatible version of org.
> [ERROR] missing or invalid dependency detected while loading class file
> 'WebUI.class'.
> Could not access term jetty in value org.eclipse,
> because it (or its dependencies) are missing. Check your build definition
> for
> missing or conflicting dependencies. (Re-run with `-Ylog-classpath` to see
> the problematic classpath.)
> A full rebuild may help if 'WebUI.class' was compiled against an
> incompatible version of org.eclipse.
> [WARNING] 17 warnings found
> [ERROR] two errors found
> [INFO] 
> 
> [INFO] Reactor Summary:
> [INFO]
> [INFO] Spark Project Parent POM .. SUCCESS [4.399s]
> [INFO] Spark Project Test Tags ... SUCCESS [3.443s]
> [INFO] Spark Project Launcher  SUCCESS
> [10.131s]
> [INFO] Spark Project Networking .. SUCCESS
> [11.849s]
> [INFO] Spark Project Shuffle Streaming Service ... SUCCESS [6.641s]
> [INFO] Spark Project Unsafe .. SUCCESS
> [19.765s]
> [INFO] Spark Project Core  SUCCESS
> [4:16.511s]
> [INFO] Spark Project Bagel ... SUCCESS
> [13.401s]
> [INFO] Spark Project GraphX .. SUCCESS
> [1:08.824s]
> [INFO] Spark Project Streaming ... SUCCESS
> [2:18.844s]
> [INFO] Spark Project Catalyst  SUCCESS
> [2:43.695s]
> [INFO] Spark Project SQL . FAILURE
> [1:01.762s]
> [INFO] Spark Project ML Library .. SKIPPED
> [INFO] Spark Project Tools ... SKIPPED
> [INFO] Spark Project Hive  SKIPPED
> [INFO] Spark Project Docker Integration Tests  SKIPPED
> [INFO] Spark Project REPL  SKIPPED
> [INFO] Spark Project YARN Shuffle Service  SKIPPED
> [INFO] Spark Project YARN  SKIPPED
> [INFO] Spark Project Assembly  SKIPPED
> [INFO] Spark Project External Twitter  SKIPPED
> [INFO] Spark Project External Flume Sink . SKIPPED
> [INFO] Spark Project External Flume .. SKIPPED
> [INFO] Spark Project External Flume Assembly . SKIPPED
> [INFO] Spark Project External MQTT ... SKIPPED
> [INFO] Spark Project External MQTT Assembly .. SKIPPED
> [INFO] Spark Project External ZeroMQ . SKIPPED
> [INFO] Spark Project External Kafka .. SKIPPED
> [INFO] Spark Project Examples  SKIPPED
> [INFO] Spark Project External Kafka Assembly . SKIPPED
> [INFO] 
> 
> [INFO] BUILD FAILURE
> [INFO] 
> 
> [INFO] Total time: 12:40.525s
> [INFO] Finished at: Wed Aug 31 01:56:50 IST 2016
> [INFO] Final Memory: 71M/830M
> [INFO] 
> 
> [ERROR] Failed to execute goal 
> net.alchim31.maven:scala-maven-plugin:3.2.2:compile
> (scala-compile-first) on project spark-sql_2.11: Execution
> scala-compile-first of goal 
> net.alchim31.maven:scala-maven-plugin:3.2.2:compile
> failed. CompileFailed -> [Help 1]
> [ERROR]
> [ERROR] To see the full stack trace of the errors, re-run Maven with the
> -e switch.
> [ERROR] Re-run Maven using the -X switch to enable full debug logging.
> [ERROR]
> [ERROR] For more information about the errors and possible solutions,
> please read the following articles:
> [ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/
> PluginExecutionException
> [ERROR]
> [ERROR] After correcting the problems, you can resume the build with the
> command
> [ERROR]   mvn  -rf :spark-sql_2.11
>
>
>


[Error:]while read s3 buckets in Spark 1.6 in spark -submit

2016-08-31 Thread Divya Gehlot
Hi,
I am using Spark 1.6.1 in EMR machine
I am trying to read s3 buckets in my Spark job .
When I read it through Spark shell I am able to read it ,but when I try to
package the job and and run it as spark submit I am getting below error

16/08/31 07:36:38 INFO ApplicationMaster: Registered signal handlers for
[TERM, HUP, INT]

> 16/08/31 07:36:39 INFO ApplicationMaster: ApplicationAttemptId:
> appattempt_1468570153734_2851_01
> Exception in thread "main" java.util.ServiceConfigurationError:
> org.apache.hadoop.fs.FileSystem: Provider
> org.apache.hadoop.fs.s3a.S3AFileSystem could not be instantiated
> at java.util.ServiceLoader.fail(ServiceLoader.java:224)
> at java.util.ServiceLoader.access$100(ServiceLoader.java:181)
> at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:377)
> at java.util.ServiceLoader$1.next(ServiceLoader.java:445)
> at org.apache.hadoop.fs.FileSystem.loadFileSystems(FileSystem.java:2673)
> at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2684)
> at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2701)
> at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91)
> at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2737)
> at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2719)
> at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:375)
> at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:174)
> at
> org.apache.spark.deploy.yarn.ApplicationMaster.run(ApplicationMaster.scala:142)
> at
> org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$main$1.apply$mcV$sp(ApplicationMaster.scala:653)
> at
> org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:69)
> at
> org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:68)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:415)
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
> at
> org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:68)
> at
> org.apache.spark.deploy.yarn.ApplicationMaster$.main(ApplicationMaster.scala:651)
> at
> org.apache.spark.deploy.yarn.ApplicationMaster.main(ApplicationMaster.scala)
> Caused by: java.lang.NoClassDefFoundError:
> com/amazonaws/services/s3/AmazonS3
> at java.lang.Class.getDeclaredConstructors0(Native Method)
> at java.lang.Class.privateGetDeclaredConstructors(Class.java:2595)
> at java.lang.Class.getConstructor0(Class.java:2895)
> at java.lang.Class.newInstance(Class.java:354)
> at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:373)
> ... 19 more
> Caused by: java.lang.ClassNotFoundException:
> com.amazonaws.services.s3.AmazonS3
> at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
> at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
> at java.security.AccessController.doPrivileged(Native Method)
> at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
> ... 24 more
> End of LogType:stderr



I have already included

 "com.amazonaws" % "aws-java-sdk-s3" % "1.11.15",

in my build.sbt


I tried the provinding the access key also in my job still the same error
persists.

when I googled it I if you have IAM role created there is no need to
provide access key .

Would really appreciate the help.


Thanks,

Divya


Re: Getting a TreeNode Exception while saving into Hadoop

2016-08-17 Thread Divya Gehlot
Can you please check order of all the data set of union all operations.


Are they in same order ?

On 9 August 2016 at 02:47, max square  wrote:

> Hey guys,
>
> I'm trying to save Dataframe in CSV format after performing unionAll
> operations on it.
> But I get this exception -
>
> Exception in thread "main" 
> org.apache.spark.sql.catalyst.errors.package$TreeNodeException:
> execute, tree:
> TungstenExchange hashpartitioning(mId#430,200)
>
> I'm saving it by
>
> df.write.format("com.databricks.spark.csv").options(Map("mode" ->
> "DROPMALFORMED", "delimiter" -> "\t", "header" -> "true")).save(bakDir +
> latest)
>
> It works perfectly if I don't do the unionAll operation.
> I see that the format isn't different by printing the part of the results.
>
> Any help regarding this would be appreciated.
>
>


[Spark 1.6]-increment value column based on condition + Dataframe

2016-08-09 Thread Divya Gehlot
Hi,
I have column values having values like
Value
30
12
56
23
12
16
12
89
12
5
6
4
8

I need create another column
if col("value") > 30  1 else col("value") < 30
newColValue
0
1
0
1
2
3
4
0
1
2
3
4
5

How can I have create an increment column
The grouping is happening based on some other cols which is not mentioned
here.
When I try Windows sum function ,its summing but instead of incrementing it
the total sum is getting displayed in all the rows .
val overWin = Window.partitionBy('col1,'col2,'col3).orderBy('Value)
val total = sum('Value).over(overWin)

With this logic
I am getting the below result
0
1
0
4
4
4
4
0
5
5
5
5
5

Written my own UDF also but customized UDF is not supported in windows
function in Spark 1.6

Would really appreciate the help.


Thanks,
Divya




Am I missing something


Re: [Spark1.6] Or (||) operator not working in DataFrame

2016-08-07 Thread Divya Gehlot
I tried with condition expression  also but it didn't work :(

On Aug 8, 2016 11:13 AM, "Chanh Le" <giaosu...@gmail.com> wrote:

> You should use *df.where(conditionExpr)* which is more convenient to
> express some simple term in SQL.
>
>
> /**
>  * Filters rows using the given SQL expression.
>  * {{{
>  *   peopleDf.where("age > 15")
>  * }}}
>  * @group dfops
>  * @since 1.5.0
>  */
> def where(conditionExpr: String): DataFrame = {
>   filter(Column(SqlParser.parseExpression(conditionExpr)))
> }
>
>
>
>
>
> On Aug 7, 2016, at 10:58 PM, Mich Talebzadeh <mich.talebza...@gmail.com>
> wrote:
>
> although the logic should be col1 <> a && col(1) <> b
>
> to exclude both
>
> Like
>
> df.filter('transactiontype > " ").filter(not('transactiontype ==="DEB") &&
> not('transactiontype ==="BGC")).select('transactiontype).distinct.
> collect.foreach(println)
>
> HTH
>
> Dr Mich Talebzadeh
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
> http://talebzadehmich.wordpress.com
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 7 August 2016 at 16:53, Mich Talebzadeh <mich.talebza...@gmail.com>
> wrote:
>
>> try similar to this
>>
>> df.filter(not('transactiontype ==="DEB") || not('transactiontype
>> ==="CRE"))
>>
>> HTH
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>> On 7 August 2016 at 15:43, Divya Gehlot <divya.htco...@gmail.com> wrote:
>>
>>> Hi,
>>> I have use case where I need to use or[||] operator in filter condition.
>>> It seems its not working its taking the condition before the operator
>>> and ignoring the other filter condition after or operator.
>>> As any body faced similar issue .
>>>
>>> Psuedo code :
>>> df.filter(col("colName").notEqual("no_value") ||
>>> col("colName").notEqual(""))
>>>
>>> Am I missing something.
>>> Would really appreciate the help.
>>>
>>>
>>> Thanks,
>>> Divya
>>>
>>
>>
>
>


[Spark1.6] Or (||) operator not working in DataFrame

2016-08-07 Thread Divya Gehlot
Hi,
I have use case where I need to use or[||] operator in filter condition.
It seems its not working its taking the condition before the operator and
ignoring the other filter condition after or operator.
As any body faced similar issue .

Psuedo code :
df.filter(col("colName").notEqual("no_value") ||
col("colName").notEqual(""))

Am I missing something.
Would really appreciate the help.


Thanks,
Divya


Re: [Spark1.6]:compare rows and add new column based on lookup

2016-08-04 Thread Divya Gehlot
 based on the time stamp column

On 5 August 2016 at 10:43, ayan guha <guha.a...@gmail.com> wrote:

> How do you know person1 is moving from street1 to street2 and not other
> way around? Basically, how do you ensure the order of the rows as you have
> written them?
>
> On Fri, Aug 5, 2016 at 12:16 PM, Divya Gehlot <divya.htco...@gmail.com>
> wrote:
>
>> Hi,
>> I am working with Spark 1.6 with scala  and using Dataframe API .
>> I have a use case where I  need to compare two rows and add entry in the
>> new column based on the lookup table
>> for example :
>> My DF looks like :
>> col1col2  newCol1
>> street1 person1
>> street2  person1 area1
>> street3 person1  area3
>> street5 person2
>> street6 person2  area5
>> street7 person4
>> street9 person4   area7
>>
>> loop up table looks like
>> street1 -> street2 - area1
>> street2 -> street 3 - area3
>> street5 -> street6 - area5
>> street 7-> street 9 - area 7
>>
>> if person moving from street 1 to street 2 then he is reaching area 1
>>
>>
>> Would really appreciate the help.
>>
>> Thanks,
>> Divya
>>
>>
>>
>>
>
>
> --
> Best Regards,
> Ayan Guha
>


[Spark1.6]:compare rows and add new column based on lookup

2016-08-04 Thread Divya Gehlot
Hi,
I am working with Spark 1.6 with scala  and using Dataframe API .
I have a use case where I  need to compare two rows and add entry in the
new column based on the lookup table
for example :
My DF looks like :
col1col2  newCol1
street1 person1
street2  person1 area1
street3 person1  area3
street5 person2
street6 person2  area5
street7 person4
street9 person4   area7

loop up table looks like
street1 -> street2 - area1
street2 -> street 3 - area3
street5 -> street6 - area5
street 7-> street 9 - area 7

if person moving from street 1 to street 2 then he is reaching area 1


Would really appreciate the help.

Thanks,
Divya


Spark GraphFrames

2016-08-01 Thread Divya Gehlot
Hi,

Has anybody has worked with GraphFrames.
Pls let me know as I need to know the real case scenarios where It can used
.


Thanks,
Divya


Re: FileUtil.fullyDelete does ?

2016-07-26 Thread Divya Gehlot
What happened in my usecase ?
Even I know what it does :)
Need to know why they are deleting the src And destination file path

On Jul 26, 2016 10:20 PM, "praveenesh kumar" <praveen...@gmail.com> wrote:

>
> https://hadoop.apache.org/docs/r2.7.1/api/org/apache/hadoop/fs/FileUtil.html#fullyDelete(java.io.File)
>
> On Tue, Jul 26, 2016 at 12:09 PM, Divya Gehlot <divya.htco...@gmail.com>
> wrote:
>
>> Resending to right list
>> ------ Forwarded message --
>> From: "Divya Gehlot" <divya.htco...@gmail.com>
>> Date: Jul 26, 2016 6:51 PM
>> Subject: FileUtil.fullyDelete does ?
>> To: "user @spark" <user@spark.apache.org>
>> Cc:
>>
>> Hi,
>> When I am doing the using theFileUtil.copymerge function
>>
>> val file = "/tmp/primaryTypes.csv"
>>
>> FileUtil.fullyDelete(new File(file))
>>
>>  val destinationFile= "/tmp/singlePrimaryTypes.csv"
>>
>> FileUtil.fullyDelete(new File(destinationFile))
>>
>>  val counts = partitions.
>>
>> reduceByKey {case (x,y) => x + y}.
>>
>> sortBy {case (key, value) => -value}.
>>
>> map { case (key, value) => Array(key, value).mkString(",") }
>>
>>  counts.saveAsTextFile(file)
>>
>>  merge(file, destinationFile)
>>
>>
>> I am wondering here what does  FileUtil.fullyDelete(new 
>> File(destinationFile)) do ?
>>
>>   does it delete the merged file If yes,then how will we access the 
>> merged file ..?
>>
>>
>> Confused here ...
>>
>>
>>
>> Thanks,
>>
>> Divya
>>
>>
>>
>


FileUtil.fullyDelete does ?

2016-07-26 Thread Divya Gehlot
Hi,
When I am doing the using theFileUtil.copymerge function

val file = "/tmp/primaryTypes.csv"

FileUtil.fullyDelete(new File(file))

 val destinationFile= "/tmp/singlePrimaryTypes.csv"

FileUtil.fullyDelete(new File(destinationFile))

 val counts = partitions.

reduceByKey {case (x,y) => x + y}.

sortBy {case (key, value) => -value}.

map { case (key, value) => Array(key, value).mkString(",") }

 counts.saveAsTextFile(file)

 merge(file, destinationFile)


I am wondering here what does  FileUtil.fullyDelete(new
File(destinationFile)) do ?

  does it delete the merged file If yes,then how will we access
the merged file ..?


Confused here ...



Thanks,

Divya


[Error] : Save dataframe to csv using Spark-csv in Spark 1.6

2016-07-24 Thread Divya Gehlot
Hi,
I am getting below error when I am trying to save dataframe using Spark-CSV

>
> final_result_df.write.format("com.databricks.spark.csv").option("header","true").save(output_path)


java.lang.NoSuchMethodError:
> scala.Predef$.$conforms()Lscala/Predef$$less$colon$less;
> at
> com.databricks.spark.csv.util.CompressionCodecs$.(CompressionCodecs.scala:29)
> at
> com.databricks.spark.csv.util.CompressionCodecs$.(CompressionCodecs.scala)
> at
> com.databricks.spark.csv.DefaultSource.createRelation(DefaultSource.scala:189)
> at
> org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:222)
> at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:148)
> at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:139)
> at
> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:97)
> at
> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:102)
> at
> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:104)
> at
> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:106)
> at
> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:108)
> at
> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:110)
> at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:112)
> at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:114)
> at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:116)
> at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:118)
> at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:120)
> at $iwC$$iwC$$iwC$$iwC$$iwC.(:122)
> at $iwC$$iwC$$iwC$$iwC.(:124)
> at $iwC$$iwC$$iwC.(:126)
> at $iwC$$iwC.(:128)
> at $iwC.(:130)
> at (:132)
> at .(:136)
> at .()
> at .(:7)
> at .()
> at $print()




*I used Same with Spark 1.5 and never faced this issue prior to this.*
Am I missing something.
Would really appreciate the help.


Thanks,
Divya


Create dataframe column from list

2016-07-22 Thread Divya Gehlot
Hi,
Can somebody help me by creating the dataframe column from the scala list .
Would really appreciate the help .

Thanks ,
Divya


add hours to from_unixtimestamp

2016-07-21 Thread Divya Gehlot
Hi,
I need to add 8  hours to from_unixtimestamp
df.withColumn(from_unixtime(col("unix_timestamp"),fmt)) as "date_time"

I am try to joda time function
def unixToDateTime (unix_timestamp : String) : DateTime = {
 val utcTS = new DateTime(unix_timestamp.toLong * 1000L)+ 8.hours
  return utcTS
}

Its throwing error : java.lang.UnsupportedOperationException: Schema for
type com.github.nscala_time.time.Imports.DateTime is not supported



Would really appreciate the help.


Thanks,
Divya


calculate time difference between consecutive rows

2016-07-20 Thread Divya Gehlot
I have a dataset of time as shown below :
Time1
07:30:23
07:34:34
07:38:23
07:39:12
07:45:20

I need to find the diff between two consecutive rows
I googled and found the *lag *function in *spark *helps in finding it .
but its  giving me *null *in the result set.

Would really appreciate the help.


Thanks,
Divya


getting null when calculating time diff with unix_timestamp + spark 1.6

2016-07-20 Thread Divya Gehlot
Hi,

val lags=sqlContext.sql("select *,(unix_timestamp(time1,'$timeFmt') -
lag(unix_timestamp(time2,'$timeFmt'))) as time_diff  from df_table");

Instead of time difference in seconds I am gettng null .

Would reay appreciate the help.


Thanks,
Divya


Re: write and call UDF in spark dataframe

2016-07-20 Thread Divya Gehlot
Hi ,
To be very specific I am looking for UDFs syntax for example which takes
String as parameter and returns integer .. how do we define the return type
.


Thanks,

Divya

On 21 July 2016 at 00:24, Andy Davidson <a...@santacruzintegration.com>
wrote:

> Hi Divya
>
> In general you will get better performance if you can minimize your use of
> UDFs. Spark 2.0/ tungsten does a lot of code generation. It will have to
> treat your UDF as a block box.
>
> Andy
>
> From: Rishabh Bhardwaj <rbnex...@gmail.com>
> Date: Wednesday, July 20, 2016 at 4:22 AM
> To: Rabin Banerjee <dev.rabin.baner...@gmail.com>
> Cc: Divya Gehlot <divya.htco...@gmail.com>, "user @spark" <
> user@spark.apache.org>
> Subject: Re: write and call UDF in spark dataframe
>
> Hi Divya,
>
> There is already "from_unixtime" exists in org.apache.spark.sql.frunctions,
> Rabin has used that in the sql query,if you want to use it in
> dataframe DSL you can try like this,
>
> val new_df = df.select(from_unixtime($"time").as("newtime"))
>
>
> Thanks,
> Rishabh.
>
> On Wed, Jul 20, 2016 at 4:21 PM, Rabin Banerjee <
> dev.rabin.baner...@gmail.com> wrote:
>
>> Hi Divya ,
>>
>> Try,
>>
>> val df = sqlContext.sql("select from_unixtime(ts,'-MM-dd') as `ts` from 
>> mr")
>>
>> Regards,
>> Rabin
>>
>> On Wed, Jul 20, 2016 at 12:44 PM, Divya Gehlot <divya.htco...@gmail.com>
>> wrote:
>>
>>> Hi,
>>> Could somebody share example of writing and calling udf which converts
>>> unix tme stamp to date tiime .
>>>
>>>
>>> Thanks,
>>> Divya
>>>
>>
>>
>


difference between two consecutive rows of same column + spark + dataframe

2016-07-20 Thread Divya Gehlot
Hi,

I have a dataset of time as shown below :
Time1
07:30:23
07:34:34
07:38:23
07:39:12
07:45:20

I need to find the diff between two consecutive rows
I googled and found the *lag *function in *spark *helps in finding it .
but its not giving me *null *in the result set.

Would really appreciate the help.


Thanks,
Divya


write and call UDF in spark dataframe

2016-07-20 Thread Divya Gehlot
Hi,
Could somebody share example of writing and calling udf which converts unix
tme stamp to date tiime .


Thanks,
Divya


Re: Dynamically get value based on Map key in Spark Dataframe

2016-07-18 Thread Divya Gehlot
Hi Jacek,

Can you please share example how can I access broacasted map
val pltStnMapBrdcst = sc.broadcast(keyvalueMap )
val df_replacekeys = df_input.withColumn("map_values",
pltStnMapBrdcst.value.get("key"

Is the above the right way to access the broadcasted map ?



Thanks,
Divya


On 18 July 2016 at 23:06, Jacek Laskowski <ja...@japila.pl> wrote:

> See broadcast variable.
>
> Or (just a thought) do join between DataFrames.
>
> Jacek
>
> On 18 Jul 2016 9:24 a.m., "Divya Gehlot" <divya.htco...@gmail.com> wrote:
>
>> Hi,
>>
>> I have created a map by reading a text file
>> val keyValueMap = file_read.map(t => t.getString(0) ->
>> t.getString(4)).collect().toMap
>>
>> Now I have another dataframe where I need to dynamically replace all the
>> keys of Map with values
>> val df_input = reading the file as dataframe
>> val df_replacekeys =
>> df_input.withColumn("map_values",lit(keyValueMap (col("key"
>>
>> Would really appreciate the help .
>>
>>
>> Thanks,
>> Divya
>>
>>
>>


Dynamically get value based on Map key in Spark Dataframe

2016-07-18 Thread Divya Gehlot
Hi,

I have created a map by reading a text file
val keyValueMap = file_read.map(t => t.getString(0) ->
t.getString(4)).collect().toMap

Now I have another dataframe where I need to dynamically replace all the
keys of Map with values
val df_input = reading the file as dataframe
val df_replacekeys =
df_input.withColumn("map_values",lit(keyValueMap (col("key"

Would really appreciate the help .


Thanks,
Divya


find two consective points

2016-07-15 Thread Divya Gehlot
Hi,
I have huge data set like similar below :
timestamp,fieldid,point_id
1468564189,89,1
1468564090,76,4
1468304090,89,9
1468304090,54,6
1468304090,54,4


Have configuration file of consecutive points --
1,9
4,6


like 1 and 9 are consecutive points similarly 4,6 are consecutive points

Now I need to group the data on field id with consecutive points
like the
sample output should look like
89, 1,4
54,4,6

Can somebody help me doing it in spark.


Thanks,
Divya


Re: Error joining dataframes

2016-05-18 Thread Divya Gehlot
Can you try var df_join = df1.join(df2,df1( "Id") ===df2("Id"),
"fullouter").drop(df1("Id"))
On May 18, 2016 2:16 PM, "ram kumar"  wrote:

I tried

scala> var df_join = df1.join(df2, "Id", "fullouter")
:27: error: type mismatch;
 found   : String("Id")
 required: org.apache.spark.sql.Column
   var df_join = df1.join(df2, "Id", "fullouter")
   ^

scala>

And I cant see the above method in
https://spark.apache.org/docs/1.5.1/api/java/org/apache/spark/sql/DataFrame.html#join(org.apache.spark.sql.DataFrame,%20org.apache.spark.sql.Column,%20java.lang.String)

On Wed, May 18, 2016 at 2:22 AM, Bijay Kumar Pathak 
wrote:

> Hi,
>
> Try this one:
>
>
> df_join = df1.*join*(df2, 'Id', "fullouter")
>
> Thanks,
> Bijay
>
>
> On Tue, May 17, 2016 at 9:39 AM, ram kumar 
> wrote:
>
>> Hi,
>>
>> I tried to join two dataframe
>>
>> df_join = df1.*join*(df2, ((df1("Id") === df2("Id")), "fullouter")
>>
>> df_join.registerTempTable("join_test")
>>
>>
>> When querying "Id" from "join_test"
>>
>> 0: jdbc:hive2://> *select Id from join_test;*
>> *Error*: org.apache.spark.sql.AnalysisException: Reference 'Id' is
>> *ambiguous*, could be: Id#128, Id#155.; line 1 pos 7 (state=,code=0)
>> 0: jdbc:hive2://>
>>
>> Is there a way to merge the value of df1("Id") and df2("Id") into one "Id"
>>
>> Thanks
>>
>
>


[Spark 1.5.2]Check Foreign Key constraint

2016-05-11 Thread Divya Gehlot
Hi,
I am using Spark 1.5.2  with Apache Phoenix 4.4
As Spark 1.5.2 doesn't support subquery in where conditions .
https://issues.apache.org/jira/browse/SPARK-4226

Is there any alternative way to find foreign key constraints.
Would really appreciate the help.



Thanks,
Divya


best fit - Dataframe and spark sql use cases

2016-05-09 Thread Divya Gehlot
Hi,
I would like to know the uses cases where data frames is best fit and use
cases where Spark SQL is best fit based on the one's  experience .


Thanks,
Divya


Found Data Quality check package for Spark

2016-05-06 Thread Divya Gehlot
Hi,
I just stumbled upon some data quality check package for spark
https://github.com/FRosner/drunken-data-quality

Has any body used it ?
Would really appreciate the feedback .




Thanks,
Divya


Re: [Spark 1.5.2 ]-how to set and get Storage level for Dataframe

2016-05-05 Thread Divya Gehlot
But why ? Any specific reason behind it ?
I am aware of that we can persist the dataframes but before proceeding
would like to know the memory level of my DFs.
I am working on performance tuning of my Spark jobs , looking for Storage
Level APIs like RDDs.




Thanks,
Divya

On 6 May 2016 at 11:16, Ted Yu <yuzhih...@gmail.com> wrote:

> I am afraid there is no such API.
>
> When persisting, you can specify StorageLevel :
>
>   def persist(newLevel: StorageLevel): this.type = {
>
> Can you tell us your use case ?
>
> Thanks
>
> On Thu, May 5, 2016 at 8:06 PM, Divya Gehlot <divya.htco...@gmail.com>
> wrote:
>
>> Hi,
>> How can I get and set storage level for Dataframes like RDDs ,
>> as mentioned in following  book links
>>
>> https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/spark-rdd-caching.html
>>
>>
>>
>> Thanks,
>> Divya
>>
>
>


[Spark 1.5.2 ]-how to set and get Storage level for Dataframe

2016-05-05 Thread Divya Gehlot
Hi,
How can I get and set storage level for Dataframes like RDDs ,
as mentioned in following  book links
https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/spark-rdd-caching.html



Thanks,
Divya


Fwd: package for data quality in Spark 1.5.2

2016-05-05 Thread Divya Gehlot
http://blog.cloudera.com/blog/2015/07/how-to-do-data-quality-checks-using-apache-spark-dataframes/
I am looking for something similar to above solution .
-- Forwarded message --
From: "Divya Gehlot" <divya.htco...@gmail.com>
Date: May 5, 2016 6:51 PM
Subject: package for data quality in Spark 1.5.2
To: "user @spark" <user@spark.apache.org>
Cc:

Hi,

Is there any package or project in Spark/scala which supports Data Quality
check?
For instance checking null values , foreign key constraint

Would really appreciate ,if somebody has already done it and happy to share
or has any open source package .


Thanks,
Divya


package for data quality in Spark 1.5.2

2016-05-05 Thread Divya Gehlot
Hi,

Is there any package or project in Spark/scala which supports Data Quality
check?
For instance checking null values , foreign key constraint

Would really appreciate ,if somebody has already done it and happy to share
or has any open source package .


Thanks,
Divya


Re: spark 1.6.1 build failure of : scala-maven-plugin

2016-05-04 Thread Divya Gehlot
Hi,

My Javac version

C:\Users\Divya>javac -version
javac 1.7.0_79

C:\Users\Divya>java -version
java version "1.7.0_79"
Java(TM) SE Runtime Environment (build 1.7.0_79-b15)
Java HotSpot(TM) 64-Bit Server VM (build 24.79-b02, mixed mode)

Do I need use higher version ?


Thanks,
Divya

On 4 May 2016 at 21:31, sunday2000 <2314476...@qq.com> wrote:

> Check your javac version, and update it.
>
>
> -- 原始邮件 ------
> *发件人:* "Divya Gehlot";<divya.htco...@gmail.com>;
> *发送时间:* 2016年5月4日(星期三) 中午11:25
> *收件人:* "sunday2000"<2314476...@qq.com>;
> *抄送:* "user"<user@spark.apache.org>; "user"<u...@phoenix.apache.org>;
> *主题:* Re: spark 1.6.1 build failure of : scala-maven-plugin
>
> Hi ,
> Even I am getting the similar error
> Failed to execute goal net.alchim31.maven:scala-maven-plugin:3.2.2:compile
> When I tried to build Phoenix Project using maven .
> Maven version : 3.3
> Java version - 1.7_67
> Phoenix - downloaded latest master from Git hub
> If anybody find the the resolution please share.
>
>
> Thanks,
> Divya
>
> On 3 May 2016 at 10:18, sunday2000 <2314476...@qq.com> wrote:
>
>> [INFO]
>> 
>> [INFO] BUILD FAILURE
>> [INFO]
>> 
>> [INFO] Total time: 14.765 s
>> [INFO] Finished at: 2016-05-03T10:08:46+08:00
>> [INFO] Final Memory: 35M/191M
>> [INFO]
>> 
>> [ERROR] Failed to execute goal
>> net.alchim31.maven:scala-maven-plugin:3.2.2:compile (scala-compile-first)
>> on project spark-test-tags_2.10: Execution scala-compile-first of goal
>> net.alchim31.maven:scala-maven-plugin:3.2.2:compile failed. CompileFailed
>> -> [Help 1]
>> org.apache.maven.lifecycle.LifecycleExecutionException: Failed to execute
>> goal net.alchim31.maven:scala-maven-plugin:3.2.2:compile
>> (scala-compile-first) on project spark-test-tags_2.10: Execution
>> scala-compile-first of goal
>> net.alchim31.maven:scala-maven-plugin:3.2.2:compile failed.
>> at
>> org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:224)
>> at
>> org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:153)
>> at
>> org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:145)
>> at
>> org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject(LifecycleModuleBuilder.java:116)
>> at
>> org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject(LifecycleModuleBuilder.java:80)
>> at
>> org.apache.maven.lifecycle.internal.builder.singlethreaded.SingleThreadedBuilder.build(SingleThreadedBuilder.java:51)
>> at
>> org.apache.maven.lifecycle.internal.LifecycleStarter.execute(LifecycleStarter.java:128)
>> at org.apache.maven.DefaultMaven.doExecute(DefaultMaven.java:307)
>> at org.apache.maven.DefaultMaven.doExecute(DefaultMaven.java:193)
>> at org.apache.maven.DefaultMaven.execute(DefaultMaven.java:106)
>> at org.apache.maven.cli.MavenCli.execute(MavenCli.java:862)
>> at org.apache.maven.cli.MavenCli.doMain(MavenCli.java:286)
>> at org.apache.maven.cli.MavenCli.main(MavenCli.java:197)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:498)
>> at
>> org.codehaus.plexus.classworlds.launcher.Launcher.launchEnhanced(Launcher.java:289)
>> at
>> org.codehaus.plexus.classworlds.launcher.Launcher.launch(Launcher.java:229)
>> at
>> org.codehaus.plexus.classworlds.launcher.Launcher.mainWithExitCode(Launcher.java:415)
>> at
>> org.codehaus.plexus.classworlds.launcher.Launcher.main(Launcher.java:356)
>> Caused by: org.apache.maven.plugin.PluginExecutionException: Execution
>> scala-compile-first of goal
>> net.alchim31.maven:scala-maven-plugin:3.2.2:compile failed.
>> at
>> org.apache.maven.plugin.DefaultBuildPluginManager.executeMojo(DefaultBuildPluginManager.java:145)
>> at
>> org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:208)
>> ... 20 more
>> Caus

Re: spark 1.6.1 build failure of : scala-maven-plugin

2016-05-03 Thread Divya Gehlot
Hi ,
Even I am getting the similar error
Failed to execute goal net.alchim31.maven:scala-maven-plugin:3.2.2:compile
When I tried to build Phoenix Project using maven .
Maven version : 3.3
Java version - 1.7_67
Phoenix - downloaded latest master from Git hub
If anybody find the the resolution please share.


Thanks,
Divya

On 3 May 2016 at 10:18, sunday2000 <2314476...@qq.com> wrote:

> [INFO]
> 
> [INFO] BUILD FAILURE
> [INFO]
> 
> [INFO] Total time: 14.765 s
> [INFO] Finished at: 2016-05-03T10:08:46+08:00
> [INFO] Final Memory: 35M/191M
> [INFO]
> 
> [ERROR] Failed to execute goal
> net.alchim31.maven:scala-maven-plugin:3.2.2:compile (scala-compile-first)
> on project spark-test-tags_2.10: Execution scala-compile-first of goal
> net.alchim31.maven:scala-maven-plugin:3.2.2:compile failed. CompileFailed
> -> [Help 1]
> org.apache.maven.lifecycle.LifecycleExecutionException: Failed to execute
> goal net.alchim31.maven:scala-maven-plugin:3.2.2:compile
> (scala-compile-first) on project spark-test-tags_2.10: Execution
> scala-compile-first of goal
> net.alchim31.maven:scala-maven-plugin:3.2.2:compile failed.
> at
> org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:224)
> at
> org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:153)
> at
> org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:145)
> at
> org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject(LifecycleModuleBuilder.java:116)
> at
> org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject(LifecycleModuleBuilder.java:80)
> at
> org.apache.maven.lifecycle.internal.builder.singlethreaded.SingleThreadedBuilder.build(SingleThreadedBuilder.java:51)
> at
> org.apache.maven.lifecycle.internal.LifecycleStarter.execute(LifecycleStarter.java:128)
> at org.apache.maven.DefaultMaven.doExecute(DefaultMaven.java:307)
> at org.apache.maven.DefaultMaven.doExecute(DefaultMaven.java:193)
> at org.apache.maven.DefaultMaven.execute(DefaultMaven.java:106)
> at org.apache.maven.cli.MavenCli.execute(MavenCli.java:862)
> at org.apache.maven.cli.MavenCli.doMain(MavenCli.java:286)
> at org.apache.maven.cli.MavenCli.main(MavenCli.java:197)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.codehaus.plexus.classworlds.launcher.Launcher.launchEnhanced(Launcher.java:289)
> at
> org.codehaus.plexus.classworlds.launcher.Launcher.launch(Launcher.java:229)
> at
> org.codehaus.plexus.classworlds.launcher.Launcher.mainWithExitCode(Launcher.java:415)
> at
> org.codehaus.plexus.classworlds.launcher.Launcher.main(Launcher.java:356)
> Caused by: org.apache.maven.plugin.PluginExecutionException: Execution
> scala-compile-first of goal
> net.alchim31.maven:scala-maven-plugin:3.2.2:compile failed.
> at
> org.apache.maven.plugin.DefaultBuildPluginManager.executeMojo(DefaultBuildPluginManager.java:145)
> at
> org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:208)
> ... 20 more
> Caused by: Compile failed via zinc server
> at
> sbt_inc.SbtIncrementalCompiler.zincCompile(SbtIncrementalCompiler.java:136)
> at
> sbt_inc.SbtIncrementalCompiler.compile(SbtIncrementalCompiler.java:86)
> at
> scala_maven.ScalaCompilerSupport.incrementalCompile(ScalaCompilerSupport.java:303)
> at
> scala_maven.ScalaCompilerSupport.compile(ScalaCompilerSupport.java:119)
> at
> scala_maven.ScalaCompilerSupport.doExecute(ScalaCompilerSupport.java:99)
> at scala_maven.ScalaMojoSupport.execute(ScalaMojoSupport.java:482)
> at
> org.apache.maven.plugin.DefaultBuildPluginManager.executeMojo(DefaultBuildPluginManager.java:134)
> ... 21 more
> [ERROR]
> [ERROR]
> [ERROR] For more information about the errors and possible solutions,
> please read the following articles:
> [ERROR] [Help 1]
> http://cwiki.apache.org/confluence/display/MAVEN/PluginExecutionException
> [ERROR]
> [ERROR] After correcting the problems, you can resume the build with the
> command
> [ERROR]   mvn  -rf :spark-test-tags_2.10


[Spark 1.5.2] Spark dataframes vs sql query -performance parameter ?

2016-05-03 Thread Divya Gehlot
Hi,
I am interested to know on which parameters  we can say Spark data frames
are better  sql queries .
Would be grateful ,If somebody can explain me with the usecases .

Thanks,
Divya


Re: [Spark 1.5.2]All data being written to only one part file rest part files are empty

2016-04-29 Thread Divya Gehlot
Hi ,

I observed if I use subset of same dataset  or data set is small  its
writing to many part files .
If data set grows its writing to only part files rest all part files empty.


Thanks,
Divya

On 25 April 2016 at 23:15, nguyen duc tuan <newvalu...@gmail.com> wrote:

> Maybe the problem is the data itself. For example, the first dataframe
> might has common keys in only one part of the second dataframe. I think you
> can verify if you are in this situation by repartition one dataframe and
> join it. If this is the true reason, you might see the result distributed
> more evenly.
>
> 2016-04-25 9:34 GMT+07:00 Divya Gehlot <divya.htco...@gmail.com>:
>
>> Hi,
>>
>> After joining two dataframes, saving dataframe using Spark CSV.
>> But all the result data is being written to only one part file whereas
>> there are 200 part files being created, rest 199 part files are empty.
>>
>> What is the cause of uneven partitioning ? How can I evenly distribute
>> the data ?
>> Would really appreciate the help.
>>
>>
>> Thanks,
>> Divya
>>
>
>


Re: Cant join same dataframe twice ?

2016-04-27 Thread Divya Gehlot
 when working with Dataframes and using explain to debug I observed that
Spark gives  different tagging number for the same dataframe columns
Like in this case
val df1 = df2.join(df3,"Column1")
Below throwing error missing columns
val df 4 = df1.join(df3,"Column2")

For instance,df2 has 2 columns ,df2 columns gets tagging like df2Col1#4
,df2Col2#5
   df3 has 4 columns ,df3 columns gets tagging like
df3Col1#6,df3Col2#7,df3Col3#8,df3Col4#9
Now after joining df1 columns tagging will be
df2Co1l#10,df2Col2#11,df3Col1#12,df3Col2#13,df3Col3#14,df3Col4#15

Now when df1 again with df3 the df3 columns tagging changed
 df2Co1l#16,df2Col2#17,df3Col1#18
,df3Col2#19,df3Col3#20,df3Col4#21,df3Col2#23,df3Col3#24,df3Col4#25

but joining df3Col1#12  would be referring to the previous dataframe and
that causes the issue .

Thanks,
Divya






On 27 April 2016 at 23:55, Ted Yu <yuzhih...@gmail.com> wrote:

> I wonder if Spark can provide better support for this case.
>
> The following schema is not user friendly (shown previsouly):
>
> StructField(b,IntegerType,false), StructField(b,IntegerType,false)
>
> Except for 'select *', there is no way for user to query any of the two
> fields.
>
> On Tue, Apr 26, 2016 at 10:17 PM, Takeshi Yamamuro <linguin@gmail.com>
> wrote:
>
>> Based on my example, how about renaming columns?
>>
>> val df1 = Seq((1, 1), (2, 2), (3, 3)).toDF("a", "b")
>> val df2 = Seq((1, 1), (2, 2), (3, 3)).toDF("a", "b")
>> val df3 = df1.join(df2, "a").select($"a", df1("b").as("1-b"),
>> df2("b").as("2-b"))
>> val df4 = df3.join(df2, df3("2-b") === df2("b"))
>>
>> // maropu
>>
>> On Wed, Apr 27, 2016 at 1:58 PM, Divya Gehlot <divya.htco...@gmail.com>
>> wrote:
>>
>>> Correct Takeshi
>>> Even I am facing the same issue .
>>>
>>> How to avoid the ambiguity ?
>>>
>>>
>>> On 27 April 2016 at 11:54, Takeshi Yamamuro <linguin@gmail.com>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> I tried;
>>>> val df1 = Seq((1, 1), (2, 2), (3, 3)).toDF("a", "b")
>>>> val df2 = Seq((1, 1), (2, 2), (3, 3)).toDF("a", "b")
>>>> val df3 = df1.join(df2, "a")
>>>> val df4 = df3.join(df2, "b")
>>>>
>>>> And I got; org.apache.spark.sql.AnalysisException: Reference 'b' is
>>>> ambiguous, could be: b#6, b#14.;
>>>> If same case, this message makes sense and this is clear.
>>>>
>>>> Thought?
>>>>
>>>> // maropu
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On Wed, Apr 27, 2016 at 6:09 AM, Prasad Ravilla <pras...@slalom.com>
>>>> wrote:
>>>>
>>>>> Also, check the column names of df1 ( after joining df2 and df3 ).
>>>>>
>>>>> Prasad.
>>>>>
>>>>> From: Ted Yu
>>>>> Date: Monday, April 25, 2016 at 8:35 PM
>>>>> To: Divya Gehlot
>>>>> Cc: "user @spark"
>>>>> Subject: Re: Cant join same dataframe twice ?
>>>>>
>>>>> Can you show us the structure of df2 and df3 ?
>>>>>
>>>>> Thanks
>>>>>
>>>>> On Mon, Apr 25, 2016 at 8:23 PM, Divya Gehlot <divya.htco...@gmail.com
>>>>> > wrote:
>>>>>
>>>>>> Hi,
>>>>>> I am using Spark 1.5.2 .
>>>>>> I have a use case where I need to join the same dataframe twice on
>>>>>> two different columns.
>>>>>> I am getting error missing Columns
>>>>>>
>>>>>> For instance ,
>>>>>> val df1 = df2.join(df3,"Column1")
>>>>>> Below throwing error missing columns
>>>>>> val df 4 = df1.join(df3,"Column2")
>>>>>>
>>>>>> Is the bug or valid scenario ?
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> Thanks,
>>>>>> Divya
>>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> ---
>>>> Takeshi Yamamuro
>>>>
>>>
>>>
>>
>>
>> --
>> ---
>> Takeshi Yamamuro
>>
>
>


getting ClassCastException when calling UDF

2016-04-27 Thread Divya Gehlot
Hi,
I am using Spark 1.5.2 and defined   below udf

import org.apache.spark.sql.functions.udf
> val myUdf  = (wgts : Int , amnt :Float) => {
> (wgts*amnt)/100.asInstanceOf[Float]
> }
>



val df2 = df1.withColumn("WEIGHTED_AMOUNT",callUDF(udfcalWghts,
FloatType,col("RATE"),col("AMOUNT")))

In my schema RATE is in integerType and Amount FLOATTYPE

I am getting below error for

> org.apache.spark.SparkException: Job aborted due to stage failure: Task
> 106 in stage 89.0 failed 4 times, most recent failure: Lost task 106.3 in
> stage 89.0 (TID 7735, ip-xx-xx-xx-xxx.ap-southeast-1.compute.internal):
> java.lang.ClassCastException: java.lang.Double cannot be cast to
> java.lang.Float
> at scala.runtime.BoxesRunTime.unboxToFloat(BoxesRunTime.java:114)
> at
> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(https://github.com/EclairJS/eclairjs-nashorn/issues/3


Can somebody help me with the resolution ?











Thanks,
Divya


Re: removing header from csv file

2016-04-26 Thread Divya Gehlot
yes you can remove the headers by removing the first row

can first() or head() to do that


Thanks,
Divya

On 27 April 2016 at 13:24, Ashutosh Kumar  wrote:

> I see there is a library spark-csv which can be used for removing header
> and processing of csv files. But it seems it works with sqlcontext only. Is
> there a way to remove header from csv files without sqlcontext ?
>
> Thanks
> Ashutosh
>


Re: Cant join same dataframe twice ?

2016-04-26 Thread Divya Gehlot
Correct Takeshi
Even I am facing the same issue .

How to avoid the ambiguity ?


On 27 April 2016 at 11:54, Takeshi Yamamuro <linguin@gmail.com> wrote:

> Hi,
>
> I tried;
> val df1 = Seq((1, 1), (2, 2), (3, 3)).toDF("a", "b")
> val df2 = Seq((1, 1), (2, 2), (3, 3)).toDF("a", "b")
> val df3 = df1.join(df2, "a")
> val df4 = df3.join(df2, "b")
>
> And I got; org.apache.spark.sql.AnalysisException: Reference 'b' is
> ambiguous, could be: b#6, b#14.;
> If same case, this message makes sense and this is clear.
>
> Thought?
>
> // maropu
>
>
>
>
>
>
>
> On Wed, Apr 27, 2016 at 6:09 AM, Prasad Ravilla <pras...@slalom.com>
> wrote:
>
>> Also, check the column names of df1 ( after joining df2 and df3 ).
>>
>> Prasad.
>>
>> From: Ted Yu
>> Date: Monday, April 25, 2016 at 8:35 PM
>> To: Divya Gehlot
>> Cc: "user @spark"
>> Subject: Re: Cant join same dataframe twice ?
>>
>> Can you show us the structure of df2 and df3 ?
>>
>> Thanks
>>
>> On Mon, Apr 25, 2016 at 8:23 PM, Divya Gehlot <divya.htco...@gmail.com>
>> wrote:
>>
>>> Hi,
>>> I am using Spark 1.5.2 .
>>> I have a use case where I need to join the same dataframe twice on two
>>> different columns.
>>> I am getting error missing Columns
>>>
>>> For instance ,
>>> val df1 = df2.join(df3,"Column1")
>>> Below throwing error missing columns
>>> val df 4 = df1.join(df3,"Column2")
>>>
>>> Is the bug or valid scenario ?
>>>
>>>
>>>
>>>
>>> Thanks,
>>> Divya
>>>
>>
>>
>
>
> --
> ---
> Takeshi Yamamuro
>


Cant join same dataframe twice ?

2016-04-25 Thread Divya Gehlot
Hi,
I am using Spark 1.5.2 .
I have a use case where I need to join the same dataframe twice on two
different columns.
I am getting error missing Columns

For instance ,
val df1 = df2.join(df3,"Column1")
Below throwing error missing columns
val df 4 = df1.join(df3,"Column2")

Is the bug or valid scenario ?




Thanks,
Divya


[Spark 1.5.2]All data being written to only one part file rest part files are empty

2016-04-24 Thread Divya Gehlot
Hi,

After joining two dataframes, saving dataframe using Spark CSV.
But all the result data is being written to only one part file whereas
there are 200 part files being created, rest 199 part files are empty.

What is the cause of uneven partitioning ? How can I evenly distribute the
data ?
Would really appreciate the help.


Thanks,
Divya


Re: Spark DataFrame sum of multiple columns

2016-04-22 Thread Divya Gehlot
Easy way of doing it

newdf = df.withColumn('total', sum(df[col] for col in df.columns))


On 22 April 2016 at 11:51, Naveen Kumar Pokala 
wrote:

> Hi,
>
>
>
> Do we have any way to perform Row level operations in spark dataframes.
>
>
>
>
>
> For example,
>
>
>
> I have a dataframe with columns from A,B,C,…Z.. I want to add one more
> column New Column with sum of all column values.
>
>
>
> A
>
> B
>
> C
>
> D
>
> .
>
> .
>
> .
>
> Z
>
> New Column
>
> 1
>
> 2
>
> 4
>
> 3
>
>
>
>
>
>
>
> 26
>
> 351
>
>
>
>
>
> Can somebody help me on this?
>
>
>
>
>
> Thanks,
>
> Naveen
>


[Ask :]Best Practices - Application logging in Spark 1.5.2 + Scala 2.10

2016-04-21 Thread Divya Gehlot
Hi,
I am using Spark with Hadoop 2.7 cluster
I need to print all my print statement and or any errors to file for
instance some info if passed some level or some error if something misisng
in my Spark Scala Script.

Can some body help me or redirect me tutorial,blog, books .
Whats the best way to achieve it.

Thanks in advance.

Divya


[Spark 1.5.2] Log4j Configuration for executors

2016-04-18 Thread Divya Gehlot
Hi,
I tried configuring logs to write it to file  for Spark Driver and
Executors .
I have two separate log4j properties files for Spark driver and executor
respectively.
Its wrtiting log for Spark driver but for executor logs I am getting below
error :

java.io.FileNotFoundException: /home/hdfs/spark_executor.log (Permission
> denied)
> at java.io.FileOutputStream.open(Native Method)
> at java.io.FileOutputStream.(FileOutputStream.java:221)



Why its giving permission denied for executor log whereas Its writing
driver logs .

I am missing any settings ?


Would really appreciate the help.



Thanks,

Divya


Fwd: [Help]:Strange Issue :Debug Spark Dataframe code

2016-04-17 Thread Divya Gehlot
Reposting again as unable to find the root cause where things are going
wrong.

Experts please help .


-- Forwarded message --
From: Divya Gehlot <divya.htco...@gmail.com>
Date: 15 April 2016 at 19:13
Subject: [Help]:Strange Issue :Debug Spark Dataframe code
To: "user @spark" <user@spark.apache.org>


Hi,
I am using Spark 1.5.2 with Scala 2.10.
Is there any other option apart from "explain(true)" to debug Spark
Dataframe code .

I am facing strange issue .
I have a lookuo dataframe and using it join another  dataframe on different
columns .

I am getting *Analysis exception* in third join.
When I checked  the logical plan ,  its using the same reference for key
but while selecting the columns reference  are changing.
For example
df1 = COLUMN1#15,COLUMN2#16,COLUMN3#17

In first two joins
I am getting the same reference and joining is happening
For first two join  the column  COLUMN1#15  I am getting the COLUMN2#16 and
COLUMN3#17.

But at third join COLUMN1#15 is same but the other column reference are
updating as  COLUMN2#167,COLUMN3#168

Its throwing Spark Analysis Exception

> org.apache.spark.sql.AnalysisException: resolved attribute(s) COLUMN1#15
> missing from


after two joins,the  dataframe has more than 25 columns

Could anybody help light the path by holding the torch.
Would really appreciate the help.

Thanks,
Divya


[Help]:Strange Issue :Debug Spark Dataframe code

2016-04-15 Thread Divya Gehlot
Hi,
I am using Spark 1.5.2 with Scala 2.10.
Is there any other option apart from "explain(true)" to debug Spark
Dataframe code .

I am facing strange issue .
I have a lookuo dataframe and using it join another  dataframe on different
columns .

I am getting *Analysis exception* in third join.
When I checked  the logical plan ,  its using the same reference for key
but while selecting the columns reference  are changing.
For example
df1 = COLUMN1#15,COLUMN2#16,COLUMN3#17

In first two joins
I am getting the same reference and joining is happening
For first two join  the column  COLUMN1#15  I am getting the COLUMN2#16 and
COLUMN3#17.

But at third join COLUMN1#15 is same but the other column reference are
updating as  COLUMN2#167,COLUMN3#168

Its throwing Spark Analysis Exception

> org.apache.spark.sql.AnalysisException: resolved attribute(s) COLUMN1#15
> missing from


after two joins,the  dataframe has more than 25 columns

Could anybody help light the path by holding the torch.
Would really appreciate the help.

Thanks,
Divya


Memory needs when using expensive operations like groupBy

2016-04-13 Thread Divya Gehlot
Hi,
I am using Spark 1.5.2 with Scala 2.10 and my Spark job keeps failing with
exit code 143 .
except one job where I am using unionAll and groupBy operation on multiple
columns .

Please advice me the options to optimize it .
The one option which I am using it now
--conf spark.executor.extraJavaOptions  -XX:MaxPermSize=1024m
-XX:PermSize=256m --conf spark.driver.extraJavaOptions
 -XX:MaxPermSize=1024m -XX:PermSize=256m --conf
spark.yarn.executor.memoryOverhead=1024

Need to know the best practices/better ways to optimize code.

Thanks,
Divya


[ASK]:Dataframe number of column limit in Saprk 1.5.2

2016-04-12 Thread Divya Gehlot
Hi,
I would like to know does Spark Dataframe API has limit  on creation of
number of columns?

Thanks,
Divya


[HELP:]Save Spark Dataframe in Phoenix Table

2016-04-07 Thread Divya Gehlot
Hi,
I hava a Hortonworks Hadoop cluster having below Configurations :
Spark 1.5.2
HBASE 1.1.x
Phoenix 4.4

I am able to connect to Phoenix through JDBC connection and able to read
the Phoenix tables .
But while writing the data back to Phoenix table
I am getting below error :

org.apache.spark.sql.AnalysisException:
org.apache.phoenix.spark.DefaultSource does not allow user-specified
schemas.;

Can any body help in resolving the above errors or any other solution of
saving Spark Dataframes to Phoenix.

Would really appareciate the help.

Thanks,
Divya


[Spark-1.5.2]Spark Memory Issue while Saving to HDFS and Pheonix both

2016-04-01 Thread Divya Gehlot
Forgot to mention
I am using all DataFrame API instead of sqls to the operations

-- Forwarded message --
From: Divya Gehlot <divya.htco...@gmail.com>
Date: 1 April 2016 at 18:35
Subject: [Spark-1.5.2]Spark Memory Issue while Saving to HDFS and Pheonix
both
To: "user @spark" <user@spark.apache.org>


[image: Mic Drop]
Hi,
I have Hadoop Hortonworks  3 NODE  Cluster on EC2 with
*Hadoop *version 2.7.x
*Spark *version - 1.5.2
*Phoenix *version - 4.4
*Hbase *version 1.1.x

*Cluster Statistics *
Date Node 1
OS: redhat7 (x86_64)Cores (CPU): 2 (2)Disk: 20.69GB/99.99GB (20.69% used)
Memory: 7.39GB
Date Node 2
Cores (CPU): 2 (2)Disk: 20.73GB/99.99GB (20.73% used)Memory: 7.39GBLoad Avg:
 0.00Heartbeat: a moment agoCurrent Version: 2.3.4.0-3485*NameNode*Rack:
 /default-rack OS: redhat7 (x86_64)Cores (CPU): 4 (4)Disk: 32.4GB/99.99GB
(32.4% used)Memory: 15.26GBLoad Avg: 0.78Heartbeat: a moment agoCurrent
Version: 2.3.4.0-3485

*Spark Queue Statistics *

> Queue State: RUNNING
> Used Capacity: 0.0%
> Configured Capacity: 100.0%
> Configured Max Capacity: 100.0%
> Absolute Used Capacity: 0.0%
> Absolute Configured Capacity: 100.0%
> Absolute Configured Max Capacity: 100.0%
> Used Resources: <memory:0, vCores:0>
> Num Schedulable Applications: 0
> Num Non-Schedulable Applications: 0
> Num Containers: 0
> Max Applications: 1
> Max Applications Per User: 1
> Max Application Master Resources: <memory:3072, vCores:1>
> Used Application Master Resources: <memory:0, vCores:0>
> Max Application Master Resources Per User: <memory:3072, vCores:1>
> Configured Minimum User Limit Percent: 100%
> Configured User Limit Factor: 1.0
> Accessible Node Labels: *
> Ordering Policy: FifoOrderingPolicy
> Preemption: disabled



I have spark scala script
which is doing many operations like reading from
DB(Phoenix),Join-Inner,LeftOuter join),unionAll and finally groupBy and
saving the result set to Phoenix/HDFS
Have created almost 20+ Dataframes for mentioned above operations.

stackTrace :

> 16/04/01 10:11:49 WARN TaskSetManager: Lost task 3.0 in stage 132.4 (TID
> 18401, ip-xxx-xx-xx-xxx.ap-southeast-1.compute.internal):
> java.lang.OutOfMemoryError: PermGen space
> at sun.misc.Unsafe.defineClass(Native Method)
> at sun.reflect.ClassDefiner.defineClass(ClassDefiner.java:63)
> at
> sun.reflect.MethodAccessorGenerator$1.run(MethodAccessorGenerator.java:399)
> at
> sun.reflect.MethodAccessorGenerator$1.run(MethodAccessorGenerator.java:396)
> at java.security.AccessController.doPrivileged(Native Method)
> at
> sun.reflect.MethodAccessorGenerator.generate(MethodAccessorGenerator.java:395)
> at
> sun.reflect.MethodAccessorGenerator.generateSerializationConstructor(MethodAccessorGenerator.java:113)
> at
> sun.reflect.ReflectionFactory.newConstructorForSerialization(ReflectionFactory.java:331)
> at
> java.io.ObjectStreamClass.getSerializableConstructor(ObjectStreamClass.java:1376)
> at java.io.ObjectStreamClass.access$1500(ObjectStreamClass.java:72)
> at java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:493)
> at java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:468)
> at java.security.AccessController.doPrivileged(Native Method)
> at java.io.ObjectStreamClass.(ObjectStreamClass.java:468)
> at java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:365)
> at
> java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:602)
> at
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1622)
> at
> java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)




For Phoenix : I am getting similar to below error in my stack trace

>
> $provider.DefaultSource does not allow user-specified schemas


The whole job is taking almost 3-4 minutes and for saving itself its taking
3-4 minutes whether it is Phoenix /HDFS

Could somebody help me resolving the above mentioned issue.

Would really appreciate the help.


Thanks,

Divya


[Spark-1.5.2]Spark Memory Issue while Saving to HDFS and Pheonix both

2016-04-01 Thread Divya Gehlot
[image: Mic Drop]
Hi,
I have Hadoop Hortonworks  3 NODE  Cluster on EC2 with
*Hadoop *version 2.7.x
*Spark *version - 1.5.2
*Phoenix *version - 4.4
*Hbase *version 1.1.x

*Cluster Statistics *
Date Node 1
OS: redhat7 (x86_64)Cores (CPU): 2 (2)Disk: 20.69GB/99.99GB (20.69% used)
Memory: 7.39GB
Date Node 2
Cores (CPU): 2 (2)Disk: 20.73GB/99.99GB (20.73% used)Memory: 7.39GBLoad Avg:
 0.00Heartbeat: a moment agoCurrent Version: 2.3.4.0-3485*NameNode*Rack:
 /default-rack OS: redhat7 (x86_64)Cores (CPU): 4 (4)Disk: 32.4GB/99.99GB
(32.4% used)Memory: 15.26GBLoad Avg: 0.78Heartbeat: a moment agoCurrent
Version: 2.3.4.0-3485

*Spark Queue Statistics *

> Queue State: RUNNING
> Used Capacity: 0.0%
> Configured Capacity: 100.0%
> Configured Max Capacity: 100.0%
> Absolute Used Capacity: 0.0%
> Absolute Configured Capacity: 100.0%
> Absolute Configured Max Capacity: 100.0%
> Used Resources: 
> Num Schedulable Applications: 0
> Num Non-Schedulable Applications: 0
> Num Containers: 0
> Max Applications: 1
> Max Applications Per User: 1
> Max Application Master Resources: 
> Used Application Master Resources: 
> Max Application Master Resources Per User: 
> Configured Minimum User Limit Percent: 100%
> Configured User Limit Factor: 1.0
> Accessible Node Labels: *
> Ordering Policy: FifoOrderingPolicy
> Preemption: disabled



I have spark scala script
which is doing many operations like reading from
DB(Phoenix),Join-Inner,LeftOuter join),unionAll and finally groupBy and
saving the result set to Phoenix/HDFS
Have created almost 20+ Dataframes for mentioned above operations.

stackTrace :

> 16/04/01 10:11:49 WARN TaskSetManager: Lost task 3.0 in stage 132.4 (TID
> 18401, ip-xxx-xx-xx-xxx.ap-southeast-1.compute.internal):
> java.lang.OutOfMemoryError: PermGen space
> at sun.misc.Unsafe.defineClass(Native Method)
> at sun.reflect.ClassDefiner.defineClass(ClassDefiner.java:63)
> at
> sun.reflect.MethodAccessorGenerator$1.run(MethodAccessorGenerator.java:399)
> at
> sun.reflect.MethodAccessorGenerator$1.run(MethodAccessorGenerator.java:396)
> at java.security.AccessController.doPrivileged(Native Method)
> at
> sun.reflect.MethodAccessorGenerator.generate(MethodAccessorGenerator.java:395)
> at
> sun.reflect.MethodAccessorGenerator.generateSerializationConstructor(MethodAccessorGenerator.java:113)
> at
> sun.reflect.ReflectionFactory.newConstructorForSerialization(ReflectionFactory.java:331)
> at
> java.io.ObjectStreamClass.getSerializableConstructor(ObjectStreamClass.java:1376)
> at java.io.ObjectStreamClass.access$1500(ObjectStreamClass.java:72)
> at java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:493)
> at java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:468)
> at java.security.AccessController.doPrivileged(Native Method)
> at java.io.ObjectStreamClass.(ObjectStreamClass.java:468)
> at java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:365)
> at
> java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:602)
> at
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1622)
> at
> java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)




For Phoenix : I am getting similar to below error in my stack trace

>
> $provider.DefaultSource does not allow user-specified schemas


The whole job is taking almost 3-4 minutes and for saving itself its taking
3-4 minutes whether it is Phoenix /HDFS

Could somebody help me resolving the above mentioned issue.

Would really appreciate the help.


Thanks,

Divya


Change TimeZone Setting in Spark 1.5.2

2016-03-29 Thread Divya Gehlot
Hi,

The Spark set up is  on Hadoop cluster.
How can I set up the Spark timezone to sync with Server Timezone ?
Any idea?


Thanks,
Divya


Re: [SQL] Two columns in output vs one when joining DataFrames?

2016-03-28 Thread Divya Gehlot
Hi Jacek ,

The difference is being mentioned in Spark doc itself

Note that if you perform a self-join using this function without aliasing
the input
* [[DataFrame]]s, you will NOT be able to reference any columns after the
join, since
* there is no way to disambiguate which side of the join you would like to
reference.
*

On 26 March 2016 at 04:19, Jacek Laskowski  wrote:

> Hi,
>
> I've read the note about both columns included when DataFrames are
> joined, but don't think it differentiated between versions of join. Is
> this a feature or a bug that the following session shows one _1 column
> with Seq("_1") and two columns for ===?
>
> {code}
> scala> left.join(right, Seq("_1")).show
> +---+---+---+
> | _1| _2| _2|
> +---+---+---+
> |  1|  a|  a|
> |  2|  b|  b|
> +---+---+---+
>
>
> scala> left.join(right, left("_1") === right("_1")).show
> +---+---+---+---+
> | _1| _2| _1| _2|
> +---+---+---+---+
> |  1|  a|  1|  a|
> |  2|  b|  2|  b|
> +---+---+---+---+
> {code}
>
> Pozdrawiam,
> Jacek Laskowski
> 
> https://medium.com/@jaceklaskowski/
> Mastering Apache Spark http://bit.ly/mastering-apache-spark
> Follow me at https://twitter.com/jaceklaskowski
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


[Spark -1.5.2]Dynamically creation of caseWhen expression

2016-03-23 Thread Divya Gehlot
Hi,
I have a map collection .
I am trying to build when condition based on the key values .
Like df.withColumn("ID", when( condition with map keys ,values of map )

How can I do that dynamically.
Currently I am iterating over keysIterator and get the values
Kal keys = myMap.keysIterator.toArray
Like below
df.withColumn("ID",when(condition on keys(0),lit(myMap get
keys(0)).when(condition on keys(1),lit(myMap get keys(1)).
when(condition on keys(2),lit(myMap get keys(3)).otherwise("value not
found"))

How can I build the above expression dynamically
Like for (key <-keys){
when(condition on key ,lit(myMap get key)
}
Would really appreciate the help.

Thanks,
Divya


find the matching and get the value

2016-03-22 Thread Divya Gehlot
Hi,
I am using Spark1.5.2
My requirement is as below

df.withColumn("NoOfDays",lit(datediff(df("Start_date"),df("end_date"


Now have to add one more columnn where my datediff(Start_date,end_date))
should match with map keys

Map looks like MyMap(1->1D,2->2D,3->3M,4->4W)

I want to do something like this

> val
> condition= MyMap.contains(lit(datediff(df("END_DATE"),df("START_DATE"
> val geId =MyMap(datediff(df("END_DATE"),df("START_DATE")))
> df.withColumn("AddColumn",when(cond,lit(getId)))


Is it possible ?

What I am missing here ..
I am beginner in scala and Spark.

Would really appreciate the help.

Thanks,
Divya


Re: declare constant as date

2016-03-21 Thread Divya Gehlot
Oh my my I am so silly

I can declare it as string and cast it to date

My apologies for Spamming the mailing list.

Thanks,
Divya

On 21 March 2016 at 14:51, Divya Gehlot <divya.htco...@gmail.com> wrote:

> Hi,
> In Spark 1.5.2
> Do we have any utiility which converts a constant value as shown below
> orcan we declare a date variable like val start_date :Date = "2015-03-02"
>
> val start_date = "2015-03-02" toDate
> like how we convert to toInt ,toString
> I searched for it but  couldnt find it
>
>
> Thanks,
> Divya
>


declare constant as date

2016-03-21 Thread Divya Gehlot
Hi,
In Spark 1.5.2
Do we have any utiility which converts a constant value as shown below
orcan we declare a date variable like val start_date :Date = "2015-03-02"

val start_date = "2015-03-02" toDate
like how we convert to toInt ,toString
I searched for it but  couldnt find it


Thanks,
Divya


Get the number of days dynamically in with Column

2016-03-20 Thread Divya Gehlot
I have a time stamping table which has data like
No of Days ID
11D
22D



and so on till 30 days

Have another Dataframe with
start date and end date
I need to get the difference between these two days and get the ID from
Time Stamping table and do With Column .

The tedious solution is


val dfTimeStamping = df.withColumn("ID",when(Diff between Start date and
Enddate ,"1D").when(Diff between Start date and Enddate ,"2D")).. have to
do till 30 days .

How can I do it dynamically ?


Thanks,
Divya


[Spark-1.5.2]Column renaming with withColumnRenamed has no effect

2016-03-19 Thread Divya Gehlot
Hi,
I am adding a new column and renaming it at same time but the renaming
doesnt have any effect.

dffiltered =
> dffiltered.unionAll(dfresult.withColumn("Col1",lit("value1").withColumn("Col2",lit("value2")).cast("int")).withColumn("Col3",lit("values3")).withColumnRenamed("Col1","Col1Rename").drop("Col1")


Can anybody help me pointing out my mistake ?

Thanks,
Divya


[Error] : dynamically union All + adding new column

2016-03-19 Thread Divya Gehlot
Hi,
I am dynamically doing union all and adding new column too

val dfresult =
> dfAcStamp.select("Col1","Col1","Col3","Col4","Col5","Col6","col7","col8","col9")
> val schemaL = dfresult.schema
> var dffiltered = sqlContext.createDataFrame(sc.emptyRDD[Row], schemaL)
> for ((key,values) <- lcrMap) {
> if(values(4) != null){
>  println("Condition="+values(4))
>  val renameRepId = values(0)+"REP_ID"
>  dffiltered.printSchema
> dfresult.printSchema
>  dffiltered =
> dffiltered.unionAll(dfresult.withColumn(renameRepId,lit(values(3))).drop("Col9").select("Col1","Col1","Col3","Col4","Col5","Col6","Col7","Col8","Col9").where(values(4))).distinct()


> }
> }



when I am printing the schema
dfresult
root
 |-- Col1: date (nullable = true)
 |-- Col2: date (nullable = true)
 |-- Col3: string (nullable = false)
 |-- Col4: string (nullable = false)
 |-- Col5: string (nullable = false)
 |-- Col6: string (nullable = true)
 |-- Col7: string (nullable = true)
 |-- Col8: string (nullable = true)
 |-- Col9: null (nullable = true)


dffiltered Schema
root
 |-- Col1: date (nullable = true)
 |-- Col2: date (nullable = true)
 |-- Col3: string (nullable = false)
 |-- Col4: string (nullable = false)
 |-- Col5: string (nullable = false)
 |-- Col6: string (nullable = true)
 |-- Col7: string (nullable = true)
 |-- Col8: string (nullable = true)
 |-- Col9: null (nullable = true)


As It is priting the same schema but when I am doing UnionAll its giving me
below error
org.apache.spark.sql.AnalysisException: Union can only be performed on
tables with the same number of columns, but the left table has 9 columns
and the right has 8;

Could somebody help me in pointing out my mistake  .


Thanks,


convert row to map of key as int and values as arrays

2016-03-15 Thread Divya Gehlot
Hi,
As I cant add colmns from another Dataframe
I am planning to  my row coulmns to map of key and arrays
As I am new to scala and spark
I am trying like below

// create an empty map
import scala.collection.mutable.{ArrayBuffer => mArrayBuffer}
var map = Map[Int,mArrayBuffer[Any]]()


def addNode(key: String, value:ArrayBuffer[Any] ) ={
nodes += (key -> (value :: (nodes get key getOrElse Nil)))
 }

  var rows = dfLnItmMappng.collect()
rows.foreach(r =>  addNode(r.getInt(2),
(r.getString(1),r.getString(3),r.getString(4),r.getString(5
for ((k,v) <- rows)
printf("key: %s, value: %s\n", k, v)

But I am getting below error :
import scala.collection.mutable.{ArrayBuffer=>mArrayBuffer}
map:
scala.collection.immutable.Map[Int,scala.collection.mutable.ArrayBuffer[Any]]
= Map()
:28: error: not found: value nodes
nodes += (key -> (value :: (nodes get key getOrElse Nil)))
^
:27: error: not found: type ArrayBuffer
   def addNode(key: String, value:ArrayBuffer[Any] ) ={



If anybody knows  better method to add coulmns from another
dataframe,please help by letting me know .


Thanks,
Divya


[How To :]Custom Logging of Spark Scala scripts

2016-03-14 Thread Divya Gehlot
Hi,
Can somebody point how can I confgure custom logs for my Spark (scala
scripts)
So that I can at which level my script failed and why ?


Thanks,
Divya


  1   2   >