Unable to acces hive table (created through hive context) in hive console

2015-12-07 Thread Divya Gehlot
Hi,

I am new bee to Spark and using HDP 2.2 which comes with Spark 1.3.1
I tried following  code example

> import org.apache.spark.sql.SQLContext
> val sqlContext = new org.apache.spark.sql.SQLContext(sc)
> import sqlContext.implicits._
>
> val personFile = "/user/hdfs/TestSpark/Person.csv"
> val df = sqlContext.load(
> "com.databricks.spark.csv",
> Map("path" -> personFile, "header" -> "true", "inferSchema" -> "true"))
> df.printSchema()
> val selectedData = df.select("Name", "Age")
> selectedData.save("NewPerson.csv", "com.databricks.spark.csv")
> val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
> hiveContext.sql("CREATE TABLE IF NOT EXISTS PersonTable (Name STRING, Age
> STRING)")
> hiveContext.sql("LOAD DATA  INPATH '/user/hdfs/NewPerson.csv' INTO TABLE
> PersonTable")
> hiveContext.sql("from PersonTable SELECT Name, Age
> ").collect.foreach(println)


I am able to access above table in HDFS

> [hdfs@sandbox ~]$ hadoop fs -ls /user/hive/warehouse/persontable
> Found 3 items
> -rw-r--r--   1 hdfs hdfs  0 2015-12-08 04:40
> /user/hive/warehouse/persontable/_SUCCESS
> -rw-r--r--   1 hdfs hdfs 47 2015-12-08 04:40
> /user/hive/warehouse/persontable/part-0
> -rw-r--r--   1 hdfs hdfs 33 2015-12-08 04:40
> /user/hive/warehouse/persontable/part-1


But when I try show tables in hive console ,I couldnt find the table.

> hive> use default ;
> OK
> Time taken: 0.864 seconds
> hive> show tables;
> OK
> dataframe_test
> sample_07
> sample_08
> Time taken: 0.521 seconds, Fetched: 3 row(s)
> hive> use xademo ;
> OK
> Time taken: 0.791 seconds
> hive> show tables;
> OK
> call_detail_records
> customer_details
> recharge_details
> Time taken: 0.256 seconds, Fetched: 3 row(s)


Can somebody guide me to right direction ,if something is wrong with the
code or I am unable to understand the concepts.
Would really appreciate your help.

Thanks,
Divya


Re: newbie best practices: is spark-ec2 intended to be used to manage long-lasting infrastructure ?

2015-12-03 Thread Divya Gehlot
Hello,
Even I have the same queries in mind .
What all the upgrades where we can use EC2 as compare to normal servers for
spark and other big data product development .
Hope to get inputs from the community .

Thanks,
Divya
On Dec 4, 2015 6:05 AM, "Andy Davidson" 
wrote:

> About 2 months ago I used spark-ec2 to set up a small cluster. The cluster
> runs a spark streaming app 7x24 and stores the data to hdfs. I also need to
> run some batch analytics on the data.
>
> Now that I have a little more experience I wonder if this was a good way
> to set up the cluster the following issues
>
>1. I have not been able to find explicit directions for upgrading the
>spark version
>   1.
>   
> http://search-hadoop.com/m/q3RTt7E0f92v0tKh2=Re+Upgrading+Spark+in+EC2+clusters
>2. I am not sure where the data is physically be stored. I think I may
>accidentally loose all my data
>3. spark-ec2 makes it easy to launch a cluster with as many machines
>as you like how ever Its not clear how I would add slaves to an existing
>installation
>
>
> Our Java streaming app we call rdd.saveAsTextFile(“hdfs://path”);
>
> ephemeral-hdfs/conf/hdfs-site.xml:
>
>   
>
> dfs.data.dir
>
> /mnt/ephemeral-hdfs/data,/mnt2/ephemeral-hdfs/data
>
>   
>
>
> persistent-hdfs/conf/hdfs-site.xml
>
>
> $ mount
>
> /dev/xvdb on /mnt type ext3 (rw,nodiratime)
>
> /dev/xvdf on /mnt2 type ext3 (rw,nodiratime)
>
>
> http://spark.apache.org/docs/latest/ec2-scripts.html
>
> *"*The spark-ec2 script also supports pausing a cluster. In this case,
> the VMs are stopped but not terminated, so they *lose all data on
> ephemeral disks* but keep the data in their root partitions and their
> persistent-pdfs.”
>
>
> Initially I though using HDFS was a good idea. spark-ec2 makes HDFS easy
> to use. I incorrectly thought spark some how knew how HDFS partitioned my
> data.
>
> I think many people are using amazon s3. I do not have an direct
> experience with S3. My concern would be that the data is not physically
> stored closed to my slaves. I.e. High communication costs.
>
> Any suggestions would be greatly appreciated
>
> Andy
>


how to skip headers when reading multiple files

2015-12-02 Thread Divya Gehlot
Hi,
I am new bee to Spark and Scala .
As one of my requirement to read and process multiple text files with
headers using DataFrame API .
How can I skip headers when processing data with DataFrame API

Thanks in advance .
Regards,
Divya


persist spark output in hive using DataFrame and saveAsTable API

2015-12-07 Thread Divya Gehlot
Hi,
I am new bee to Spark.
Could somebody guide me how can I persist my spark RDD results in Hive
using SaveAsTable API.
Would  appreciate if you could  provide the example for hive external table.

Thanks in advance.


Re: persist spark output in hive using DataFrame and saveAsTable API

2015-12-07 Thread Divya Gehlot
My input format is CSV  and I am using Spark 1.3(HDP 2,2 comes with Spark
1.3  so ...)
I am using Spark-csv to read my CSV file and using dataframe API to process
...
I followed these steps
<http://hortonworks.com/hadoop-tutorial/using-hive-with-orc-from-apache-spark/>
and
succesfully able to read the ORC file .
As these are temp tables and it doesnt store the data in hive.
I trying to figure out how to do that ?

Thanks in advance
Divya


On 7 December 2015 at 17:43, Fengdong Yu <fengdo...@everstring.com> wrote:

> If your RDD is JSON format, that’s easy.
>
> val df = sqlContext.read.json(rdd)
> df.saveAsTable(“your_table_name")
>
>
>
> > On Dec 7, 2015, at 5:28 PM, Divya Gehlot <divya.htco...@gmail.com>
> wrote:
> >
> > Hi,
> > I am new bee to Spark.
> > Could somebody guide me how can I persist my spark RDD results in Hive
> using SaveAsTable API.
> > Would  appreciate if you could  provide the example for hive external
> table.
> >
> > Thanks in advance.
> >
> >
>
>


getting error while persisting in hive

2015-12-09 Thread Divya Gehlot
Hi,
I am using spark 1.4.1 .
I am getting error when persisting spark dataframe output to hive

> scala>
> df.select("name","age").write().format("com.databricks.spark.csv").mode(SaveMode.Append).saveAsTable("PersonHiveTable");
> :39: error: org.apache.spark.sql.DataFrameWriter does not take
> parameters
>
>

Can somebody points me whats wrong here ?

Would really appreciate your help.

Thanks in advance

Divya


set up spark 1.4.1 as default spark engine in HDP 2.2/2.3

2015-12-08 Thread Divya Gehlot
Hi,
As per requirement I need to use Spark 1.4.1.But HDP doesnt comes with
Spark 1.4.1 version.
As instructed in  this hortonworks page

I am able to set up Spark 1.4 in HDP ,but when I run the spark shell It
shows Spark 1.3.1 REPL instead of spark 1.4.1 .
Do I need to make any configuration changes apart from instructions given
in above mentioned page.
How do I set spark 1.4.1 as the default spark engine.
Options :
1.Do I need to remove the current spark 1.3.1 dir.
2. I named my spark installation dir names spark 1.4.1,Do I need to rename
as Spark.
3.Is there configuration needs to change  in HDP to set spark 1.4.1 as
default spark engine .

Would really appreciate your help.

Thanks,
Regards,


Difference between Local Hive Metastore server and A Hive-based Metastore server

2015-12-17 Thread Divya Gehlot
Hi,
I am new bee to spark and using 1.4.1
Got confused between  Local Metastore server and a hive based metastore
server.
Can somebody share the usecases when to use which one  and pros and cons ?

I am using HDP 2,.3.2 in which hive-site-xml is already in spark
configuration directory that means HDP 2.3.2 already uses hive based
metastore server.


Pros and cons -Saving spark data in hive

2015-12-15 Thread Divya Gehlot
Hi,
I am new bee to Spark and  I am exploring option and pros and cons which
one will work best in spark and hive context.My  dataset  inputs are CSV
files, using spark to process the my data and saving it in hive using
hivecontext

1) Process the CSV file using spark-csv package and create temptable and
store the data in hive using hive context.
2) Process the file as normal text file in sqlcontext  ,register its as
temptable in sqlcontext and store it as ORC file and read that ORC file in
hive context and store it in hive.

Is there any other best options apart from mentioned above.
Would really appreciate the inputs.
Thanks in advance.

Thanks,
Regards,
Divya


org.apache.spark.SparkException: Task failed while writing rows.+ Spark output data to hive table

2015-12-10 Thread Divya Gehlot
Hi,

I am using HDP2.3.2 with Spark 1.4.1 and trying to insert data in hive
table using hive context.

Below is the sample code


   1. spark-shell   --master yarn-client --driver-memory 512m
--executor-memory 512m
   2. //Sample code
   3. import org.apache.spark.sql.SQLContext
   4. import sqlContext.implicits._
   5. val sqlContext = new org.apache.spark.sql.SQLContext(sc)
   6. val people = sc.textFile("/user/spark/people.txt")
   7. val schemaString = "name age"
   8. import org.apache.spark.sql.Row;
   9. import org.apache.spark.sql.types.{StructType,StructField,StringType};
   10. val schema =
   11.   StructType(
   12. schemaString.split(" ").map(fieldName =>
StructField(fieldName, StringType, true)))
   13. val rowRDD = people.map(_.split(",")).map(p => Row(p(0), p(1).trim))
   14. //Create hive context
   15. val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
   16. //Apply the schema to the
   17. val df = hiveContext.createDataFrame(rowRDD, schema);
   18. val options = Map("path" ->
"hdfs://sandbox.hortonworks.com:8020/apps/hive/warehouse/personhivetable")
   19. 
df.write.format("org.apache.spark.sql.hive.orc.DefaultSource").options(options).saveAsTable("personhivetable")

Getting below error :


   1. org.apache.spark.SparkException: Task failed while writing rows.
   2.   at 
org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org$apache$spark$sql$sources$InsertIntoHadoopFsRelation$writeRows$1(commands.scala:191)
   3.   at 
org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$anonfun$insert$1.apply(commands.scala:160)
   4.   at 
org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$anonfun$insert$1.apply(commands.scala:160)
   5.   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
   6.   at org.apache.spark.scheduler.Task.run(Task.scala:70)
   7.   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
   8.   at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
   9.   at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
   10.  at java.lang.Thread.run(Thread.java:745)
   11. Caused by: java.lang.ArrayIndexOutOfBoundsException: 1
   12.  at 
$line30.$read$iwC$iwC$iwC$iwC$iwC$iwC$iwC$iwC$iwC$iwC$anonfun$2.apply(:29)
   13.  at 
$line30.$read$iwC$iwC$iwC$iwC$iwC$iwC$iwC$iwC$iwC$iwC$anonfun$2.apply(:29)
   14.  at scala.collection.Iterator$anon$11.next(Iterator.scala:328)
   15.  at scala.collection.Iterator$anon$11.next(Iterator.scala:328)
   16.  at 
org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org$apache$spark$sql$sources$InsertIntoHadoopFsRelation$writeRows$1(commands.scala:182)
   17.  ... 8 more

Is it configuration issue?

When I googled it I found out that Environment variable named HIVE_CONF_DIR
should be there in spark-env.sh

Then I checked spark-env.sh in HDP2.3.2,I couldnt find the Environment
variable named HIVE_CONF_DIR .

Do I need to add above mentioned variables to insert spark output data to
hive tables.

Would really appreciate pointers.

Thanks,

Divya
Add comment



Re: Timestamp datatype in dataframe + Spark 1.4.1

2015-12-29 Thread Divya Gehlot
Hello Community Users,
I am able to resolve the issue .
The issue was input data format ,By default Excel writes the data in
2001/01/09 whereas Spark Sql takes 2001-01-09 format.

Here is the sample code below


SQL context available as sqlContext.

scala> import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.hive.HiveContext

scala> import org.apache.spark.sql.hive.orc._
import org.apache.spark.sql.hive.orc._

scala> val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
15/12/29 04:29:39 WARN SparkConf: The configuration key
'spark.yarn.applicationMaster.waitTries' has been deprecated as of Spark
1.3 and and may be removed in the future. Please use the new key
'spark.yarn.am.waitTime' instead.
15/12/29 04:29:39 INFO HiveContext: Initializing execution hive, version
0.13.1
hiveContext: org.apache.spark.sql.hive.HiveContext =
org.apache.spark.sql.hive.HiveContext@7312f6d8

scala> import org.apache.spark.sql.types.{StructType, StructField,
StringType, IntegerType,FloatType ,LongType ,TimestampType ,DateType };
import org.apache.spark.sql.types.{StructType, StructField, StringType,
IntegerType, FloatType, LongType, TimestampType, DateType}

scala> val customSchema = StructType(Seq(StructField("year", DateType,
true),StructField("make", StringType, true),StructField("model",
StringType, true),StructField("comment", StringType,
true),StructField("blank", StringType, true)))
customSchema: org.apache.spark.sql.types.StructType =
StructType(StructField(year,DateType,true),
StructField(make,StringType,true), StructField(model,StringType,true),
StructField(comment,StringType,true), StructField(blank,StringType,true))

scala> val df =
hiveContext.read.format("com.databricks.spark.csv").option("header",
"true").schema(customSchema).load("/tmp/TestDivya/carsdate.csv")
15/12/29 04:30:27 INFO HiveContext: Initializing HiveMetastoreConnection
version 0.13.1 using Spark classes.
df: org.apache.spark.sql.DataFrame = [year: date, make: string, model:
string, comment: string, blank: string]

scala> df.printSchema()
root
 |-- year: date (nullable = true)
 |-- make: string (nullable = true)
 |-- model: string (nullable = true)
 |-- comment: string (nullable = true)
 |-- blank: string (nullable = true)


scala> val selectedData = df.select("year", "model")
selectedData: org.apache.spark.sql.DataFrame = [year: date, model: string]

scala> selectedData.show()
15/12/29 04:31:20 INFO MemoryStore: ensureFreeSpace(216384) called with
curMem=0, maxMem=278302556
15/12/29 04:31:20 INFO MemoryStore: Block broadcast_0 stored as values in
memory (estimated size 211.3 KB, free 265.2 MB)

15/12/29 04:31:24 INFO YarnScheduler: Removed TaskSet 2.0, whose tasks have
all completed, from pool
15/12/29 04:31:24 INFO DAGScheduler: ResultStage 2 (show at :35)
finished in 0.051 s
15/12/29 04:31:24 INFO DAGScheduler: Job 2 finished: show at :35,
took 0.063356 s
+--+-+
|  year|model|
+--+-+
|2001-01-01|S|
|2010-12-10| |
|2009-01-11| E350|
|2008-01-01| Volt|
+--+-+

On 30 December 2015 at 00:42, Annabel Melongo <melongo_anna...@yahoo.com>
wrote:

> Divya,
>
> From reading the post, it appears that you resolved this issue. Great job!
>
> I would recommend putting the solution here as well so that it helps
> another developer down the line.
>
> Thanks
>
>
> On Monday, December 28, 2015 8:56 PM, Divya Gehlot <
> divya.htco...@gmail.com> wrote:
>
>
> Hi,
> Link to schema issue
> <https://community.hortonworks.com/questions/8124/returns-empty-result-set-when-using-timestamptype.html>
> Please let me know if have issue in viewing the above link
>
> On 28 December 2015 at 23:00, Annabel Melongo <melongo_anna...@yahoo.com>
> wrote:
>
> Divya,
>
> Why don't you share how you create the dataframe using the schema as
> stated in 1)
>
>
> On Monday, December 28, 2015 4:42 AM, Divya Gehlot <
> divya.htco...@gmail.com> wrote:
>
>
> Hi,
> I have input data set which is CSV file where I have date columns.
> My output will also be CSV file and will using this output CSV  file as
> for hive table creation.
> I have few queries :
> 1.I tried using custom schema using Timestamp but it is returning empty
> result set when querying the dataframes.
> 2.Can I use String datatype in Spark for date column and while creating
> table can define it as date type ? Partitioning of my hive table will be
> date column.
>
> Would really  appreciate if you share some sample code for timestamp in
> Dataframe whereas same can be used while creating the hive table.
>
>
>
> Thanks,
> Divya
>
>
>
>
>
>


Error while starting Zeppelin Service in HDP2.3.2

2015-12-30 Thread Divya Gehlot
Hi,
I am getting following error while starting the Zeppelin service from
ambari server .

/var/lib/ambari-agent/data/errors-2408.txt

Traceback (most recent call last):
  File 
"/var/lib/ambari-agent/cache/stacks/HDP/2.3/services/ZEPPELIN/package/scripts/master.py",
line 295, in 
Master().execute()
  File 
"/usr/lib/python2.6/site-packages/resource_management/libraries/script/script.py",
line 216, in execute
method(env)
  File 
"/var/lib/ambari-agent/cache/stacks/HDP/2.3/services/ZEPPELIN/package/scripts/master.py",
line 230, in start
Execute (params.zeppelin_dir+'/bin/zeppelin-daemon.sh start >> ' +
params.zeppelin_log_file, user=params.zeppelin_user)
  File "/usr/lib/python2.6/site-packages/resource_management/core/base.py",
line 154, in __init__
self.env.run()
  File 
"/usr/lib/python2.6/site-packages/resource_management/core/environment.py",
line 152, in run
self.run_action(resource, action)
  File 
"/usr/lib/python2.6/site-packages/resource_management/core/environment.py",
line 118, in run_action
provider_action()
  File 
"/usr/lib/python2.6/site-packages/resource_management/core/providers/system.py",
line 260, in action_run
tries=self.resource.tries, try_sleep=self.resource.try_sleep)
  File "/usr/lib/python2.6/site-packages/resource_management/core/shell.py",
line 70, in inner
result = function(command, **kwargs)
  File "/usr/lib/python2.6/site-packages/resource_management/core/shell.py",
line 92, in checked_call
tries=tries, try_sleep=try_sleep)
  File "/usr/lib/python2.6/site-packages/resource_management/core/shell.py",
line 140, in _call_wrapper
result = _call(command, **kwargs_copy)
  File "/usr/lib/python2.6/site-packages/resource_management/core/shell.py",
line 290, in _call
err_msg = Logger.filter_text(("Execution of '%s' returned %d. %s")
% (command_alias, code, all_output))
UnicodeDecodeError: 'ascii' codec can't decode byte 0xe2 in position
31: ordinal not in range(128)

stdout:   /var/lib/ambari-agent/data/output-2408.txt

2015-12-31 02:01:20,438 - Group['hadoop'] {}
2015-12-31 02:01:20,439 - Group['users'] {}
2015-12-31 02:01:20,439 - Group['zeppelin'] {}
2015-12-31 02:01:20,439 - Group['knox'] {}
2015-12-31 02:01:20,439 - Group['spark'] {}
2015-12-31 02:01:20,440 - User['hive'] {'gid': 'hadoop', 'groups': [u'hadoop']}
2015-12-31 02:01:20,440 - User['oozie'] {'gid': 'hadoop', 'groups': [u'users']}
2015-12-31 02:01:20,441 - User['zeppelin'] {'gid': 'hadoop', 'groups':
[u'hadoop']}
2015-12-31 02:01:20,441 - User['ambari-qa'] {'gid': 'hadoop',
'groups': [u'users']}
2015-12-31 02:01:20,442 - User['flume'] {'gid': 'hadoop', 'groups': [u'hadoop']}
2015-12-31 02:01:20,442 - User['hdfs'] {'gid': 'hadoop', 'groups': [u'hadoop']}
2015-12-31 02:01:20,443 - User['knox'] {'gid': 'hadoop', 'groups': [u'hadoop']}
2015-12-31 02:01:20,443 - User['spark'] {'gid': 'hadoop', 'groups': [u'hadoop']}
2015-12-31 02:01:20,444 - User['mapred'] {'gid': 'hadoop', 'groups':
[u'hadoop']}
2015-12-31 02:01:20,444 - User['tez'] {'gid': 'hadoop', 'groups': [u'users']}
2015-12-31 02:01:20,445 - User['zookeeper'] {'gid': 'hadoop',
'groups': [u'hadoop']}
2015-12-31 02:01:20,445 - User['sqoop'] {'gid': 'hadoop', 'groups': [u'hadoop']}
2015-12-31 02:01:20,446 - User['yarn'] {'gid': 'hadoop', 'groups': [u'hadoop']}
2015-12-31 02:01:20,446 - User['hcat'] {'gid': 'hadoop', 'groups': [u'hadoop']}
2015-12-31 02:01:20,447 - User['ams'] {'gid': 'hadoop', 'groups': [u'hadoop']}
2015-12-31 02:01:20,447 -
File['/var/lib/ambari-agent/tmp/changeUid.sh'] {'content':
StaticFile('changeToSecureUid.sh'), 'mode': 0555}
2015-12-31 02:01:20,448 -
Execute['/var/lib/ambari-agent/tmp/changeUid.sh ambari-qa
/tmp/hadoop-ambari-qa,/tmp/hsperfdata_ambari-qa,/home/ambari-qa,/tmp/ambari-qa,/tmp/sqoop-ambari-qa']
{'not_if': '(test $(id -u ambari-qa) -gt 1000) || (false)'}
2015-12-31 02:01:20,452 - Skipping
Execute['/var/lib/ambari-agent/tmp/changeUid.sh ambari-qa
/tmp/hadoop-ambari-qa,/tmp/hsperfdata_ambari-qa,/home/ambari-qa,/tmp/ambari-qa,/tmp/sqoop-ambari-qa']
due to not_if
2015-12-31 02:01:20,453 - Group['hdfs'] {'ignore_failures': False}
2015-12-31 02:01:20,453 - User['hdfs'] {'ignore_failures': False,
'groups': [u'hadoop', u'hdfs']}
2015-12-31 02:01:20,453 - Directory['/etc/hadoop'] {'mode': 0755}
2015-12-31 02:01:20,465 -
File['/usr/hdp/current/hadoop-client/conf/hadoop-env.sh'] {'content':
InlineTemplate(...), 'owner': 'hdfs', 'group': 'hadoop'}
2015-12-31 02:01:20,466 -
Directory['/var/lib/ambari-agent/tmp/hadoop_java_io_tmpdir'] {'owner':
'hdfs', 'group': 'hadoop', 'mode': 0777}
2015-12-31 02:01:20,474 - Execute[('setenforce', '0')] {'not_if': '(!
which getenforce ) || (which getenforce && getenforce | grep -q
Disabled)', 'sudo': True, 'only_if': 'test -f /selinux/enforce'}
2015-12-31 02:01:20,482 - Skipping Execute[('setenforce', '0')] due to only_if
2015-12-31 02:01:20,482 - Directory['/var/log/hadoop'] {'owner':
'root', 'mode': 0775, 'group': 'hadoop', 'recursive': True,
'cd_access': 'a'}

error creating custom schema

2015-12-23 Thread Divya Gehlot
Hi,
I am trying to create custom schema but its throwing below error


scala> import org.apache.spark.sql.hive.HiveContext
> import org.apache.spark.sql.hive.HiveContext
>
> scala> import org.apache.spark.sql.hive.orc._
> import org.apache.spark.sql.hive.orc._
>
> scala> val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
> 15/12/23 04:42:09 WARN SparkConf: The configuration key
> 'spark.yarn.applicationMaster.waitTries' has been deprecated as of Spark
> 1.3 and and may be removed in the future. Please use the new key
> 'spark.yarn.am.waitTime' instead.
> 15/12/23 04:42:09 INFO HiveContext: Initializing execution hive, version
> 0.13.1
> hiveContext: org.apache.spark.sql.hive.HiveContext =
> org.apache.spark.sql.hive.HiveContext@3ca50ddf
>
> scala> import org.apache.spark.sql.types.{StructType, StructField,
> StringType, IntegerType,FloatType ,LongType ,TimestampType };
> import org.apache.spark.sql.types.{StructType, StructField, StringType,
> IntegerType, FloatType, LongType, TimestampType}
>
> scala> val loandepoSchema = StructType(Seq(
>  | StructField("C1" StringType, true),
>  | StructField("COLUMN2", StringType , true),
>  | StructField("COLUMN3", StringType, true),
>  | StructField("COLUMN4", StringType, true),
>  | StructField("COLUMN5", StringType , true),
>  | StructField("COLUMN6", StringType, true),
>  | StructField("COLUMN7", StringType, true),
>  | StructField("COLUMN8", StringType, true),
>  | StructField("COLUMN9", StringType, true),
>  | StructField("COLUMN10", StringType, true),
>  | StructField("COLUMN11", StringType, true),
>  | StructField("COLUMN12", StringType, true),
>  | StructField("COLUMN13", StringType, true),
>  | StructField("COLUMN14", StringType, true),
>  | StructField("COLUMN15", StringType, true),
>  | StructField("COLUMN16", StringType, true),
>  | StructField("COLUMN17", StringType, true)
>  | StructField("COLUMN18", StringType, true),
>  | StructField("COLUMN19", StringType, true),
>  | StructField("COLUMN20", StringType, true),
>  | StructField("COLUMN21", StringType, true),
>  | StructField("COLUMN22", StringType, true)))
> :25: error: value StringType is not a member of String
>StructField("C1" StringType, true),
> ^
>

Would really appreciate the guidance/pointers.

Thanks,
Divya


Re: DataFrame Save is writing just column names while saving

2015-12-27 Thread Divya Gehlot
yes
Sharing the execution flow

15/12/28 00:19:15 INFO SessionState: No Tez session required at this point.
hive.execution.engine=mr.
15/12/28 00:19:15 INFO SparkILoop: Created sql context (with Hive support)..
SQL context available as sqlContext.

scala> import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.hive.HiveContext

scala> import org.apache.spark.sql.hive.orc._
import org.apache.spark.sql.hive.orc._

scala> val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
15/12/28 00:20:15 WARN SparkConf: The configuration key
'spark.yarn.applicationMaster.waitTries' has been deprecated as of Spark
1.3 and and may be removed in the future. Please use the new key
'spark.yarn.am.waitTime' instead.
15/12/28 00:20:15 INFO HiveContext: Initializing execution hive, version
0.13.1
hiveContext: org.apache.spark.sql.hive.HiveContext =
org.apache.spark.sql.hive.HiveContext@9046f81

scala> import org.apache.spark.sql.types.{StructType, StructField,
StringType, IntegerType,FloatType ,LongType ,TimestampType };
import org.apache.spark.sql.types.{StructType, StructField, StringType,
IntegerType, FloatType, LongType, TimestampType}

scala> val carsSchema = StructType(Seq(StructField("year", IntegerType,
true),StructField("make", StringType, true),StructField("model",
StringType, true),StructField("comment", StringType,
true),StructField("blank", StringType, true)))
carsSchema: org.apache.spark.sql.types.StructType =
StructType(StructField(year,IntegerType,true),
StructField(make,StringType,true), StructField(model,StringType,true),
StructField(comment,StringType,true), StructField(blank,StringType,true))

scala> val carsdf =
hiveContext.read.format("com.databricks.spark.csv").option("header",
"true").schema(carsSchema).load("/tmp/TestDivya/cars.csv")
15/12/28 00:20:45 INFO HiveContext: Initializing HiveMetastoreConnection
version 0.13.1 using Spark classes.
carsdf: org.apache.spark.sql.DataFrame = [year: int, make: string, model:
string, comment: string, blank: string]

scala> val carUsersSchema = StructType(Seq(StructField("Name", StringType,
true),StructField("Car_Model", StringType  , true)))
carUsersSchema: org.apache.spark.sql.types.StructType =
StructType(StructField(Name,StringType,true),
StructField(Car_Model,StringType,true))

scala> val carUsersdf =
hiveContext.read.format("com.databricks.spark.csv").option("header",
"false").schema(carUsersSchema).load("/tmp/TestDivya/CarUsers.csv")
carUsersdf: org.apache.spark.sql.DataFrame = [Name: string, Car_Model:
string]

scala> val joineddf = (carsdf.join(carUsersdf, carsdf("model") ===
carUsersdf("Car_Model"))).select(carUsersdf("Name"),carsdf("make"),carUsersdf("Car_Model"))
joineddf: org.apache.spark.sql.DataFrame = [Name: string, make: string,
Car_Model: string]

scala> joineddf.collect.foreach(println)

..

15/12/28 00:21:35 INFO DAGScheduler: ResultStage 3 (collect at
:39) finished in 2.261 s
15/12/28 00:21:35 INFO YarnScheduler: Removed TaskSet 3.0, whose tasks have
all completed, from pool
15/12/28 00:21:35 INFO DAGScheduler: Job 1 finished: collect at
:39, took 5.323441 s
[Name3,Chevy,Volt]
[Name6,Chevy,Volt]
[Name1,Tesla,S]
[Name4,Tesla,S]
[Name2,Ford,E350]
[Name5,Ford,E350]

scala>


scala> joineddf.write.format("com.databricks.spark.csv").option("header",
"true").save("/tmp/TestDivya/CarUserData.csv")
15/12/28 00:25:31 INFO Exchange: Using SparkSqlSerializer2.
15/12/28 00:25:31 INFO Exchange: Using SparkSqlSerializer2.
..
..
15/12/28 00:25:40 INFO YarnScheduler: Removed TaskSet 6.0, whose tasks have
all completed, from pool
15/12/28 00:25:40 INFO DAGScheduler: Job 2 finished: saveAsTextFile at
package.scala:157, took 9.293578 s

P.S. : Attaching the output file

On 28 December 2015 at 12:52, Ted Yu <yuzhih...@gmail.com> wrote:

> Can you confirm that file1df("COLUMN2") and file2df("COLUMN10") appeared
> in the output of joineddf.collect.foreach(println)
>  ?
>
> Thanks
>
> On Sun, Dec 27, 2015 at 6:32 PM, Divya Gehlot <divya.htco...@gmail.com>
> wrote:
>
>> Hi,
>> I am trying to join two dataframes and able to display the results in the
>> console ater join. I am saving that data and and saving in the joined data
>> in CSV format using spark-csv api . Its just saving the column names not
>> data at all.
>>
>> Below is the sample code for the reference:
>>
>> spark-shell   --packages com.databricks:spark-csv_2.10:1.1.0  --master
>&

DataFrame Save is writing just column names while saving

2015-12-27 Thread Divya Gehlot
Hi,
I am trying to join two dataframes and able to display the results in the
console ater join. I am saving that data and and saving in the joined data
in CSV format using spark-csv api . Its just saving the column names not
data at all.

Below is the sample code for the reference:

spark-shell   --packages com.databricks:spark-csv_2.10:1.1.0  --master
> yarn-client --driver-memory 512m --executor-memory 512m
>
> import org.apache.spark.sql.hive.HiveContext
> import org.apache.spark.sql.hive.orc._
> val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
> import org.apache.spark.sql.types.{StructType, StructField, StringType,
> IntegerType,FloatType ,LongType ,TimestampType };
>
> val firstSchema = StructType(Seq(StructField("COLUMN1", StringType,
> true),StructField("COLUMN2", StringType, true),StructField("COLUMN2",
> StringType, true),StructField("COLUMN3", StringType, true)
> StructField("COLUMN4", StringType, true),StructField("COLUMN5",
> StringType, true)))
> val file1df =
> hiveContext.read.format("com.databricks.spark.csv").option("header",
> "true").schema(firstSchema).load("/tmp/File1.csv")
>
>
> val secondSchema = StructType(Seq(
> StructField("COLUMN1", StringType, true),
> StructField("COLUMN2", NullType  , true),
> StructField("COLUMN3", TimestampType , true),
> StructField("COLUMN4", TimestampType , true),
> StructField("COLUMN5", NullType , true),
> StructField("COLUMN6", StringType, true),
> StructField("COLUMN7", IntegerType, true),
> StructField("COLUMN8", IntegerType, true),
> StructField("COLUMN9", StringType, true),
> StructField("COLUMN10", IntegerType, true),
> StructField("COLUMN11", IntegerType, true),
> StructField("COLUMN12", IntegerType, true)))
>
>
> val file2df =
> hiveContext.read.format("com.databricks.spark.csv").option("header",
> "false").schema(secondSchema).load("/tmp/file2.csv")
> val joineddf = file1df.join(file2df, file1df("COLUMN1") ===
> file2df("COLUMN6"))
> val selecteddata = joineddf.select(file1df("COLUMN2"),file2df("COLUMN10"))
>
//the below statement is printing the joined data

> joineddf.collect.foreach(println)
>


> //this statement saves the CSVfile but only columns names mentioned above
> on the select are being saved
> selecteddata.write.format("com.databricks.spark.csv").option("header",
> "true").save("/tmp/JoinedData.csv")
>


Would really appreciate the pointers /help.

Thanks,
Divya


DataFrame Vs RDDs ... Which one to use When ?

2015-12-27 Thread Divya Gehlot
Hi,
I am new bee to spark and a bit confused about RDDs and DataFames in Spark.
Can somebody explain me with the use cases which one to use when ?

Would really appreciate the clarification .

Thanks,
Divya


returns empty result set when using TimestampType and NullType as StructType +DataFrame +Scala + Spark 1.4.1

2015-12-28 Thread Divya Gehlot
SQL context available as sqlContext.

>
> scala> import org.apache.spark.sql.hive.HiveContext
> import org.apache.spark.sql.hive.HiveContext
>
> scala> import org.apache.spark.sql.hive.orc._
> import org.apache.spark.sql.hive.orc._
>
> scala> val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
> 15/12/28 03:34:57 WARN SparkConf: The configuration key
> 'spark.yarn.applicationMaster.waitTries' has been deprecated as of Spark
> 1.3 and and may be removed in the future. Please use the new key
> 'spark.yarn.am.waitTime' instead.
> 15/12/28 03:34:57 INFO HiveContext: Initializing execution hive, version
> 0.13.1
> hiveContext: org.apache.spark.sql.hive.HiveContext =
> org.apache.spark.sql.hive.HiveContext@3413fbe
>
> scala> import org.apache.spark.sql.types.{StructType, StructField,
> StringType, IntegerType,FloatType ,LongType ,TimestampType,NullType };
> import org.apache.spark.sql.types.{StructType, StructField, StringType,
> IntegerType, FloatType, LongType, TimestampType, NullType}
>
> scala> val loandepoSchema = StructType(Seq(
>  | StructField("COLUMN1", StringType, true),
>  | StructField("COLUMN2", StringType  , true),
>  | StructField("COLUMN3", TimestampType , true),
>  | StructField("COLUMN4", TimestampType , true),
>  | StructField("COLUMN5", StringType , true),
>  | StructField("COLUMN6", StringType, true),
>  | StructField("COLUMN7", IntegerType, true),
>  | StructField("COLUMN8", IntegerType, true),
>  | StructField("COLUMN9", StringType, true),
>  | StructField("COLUMN10", IntegerType, true),
>  | StructField("COLUMN11", IntegerType, true),
>  | StructField("COLUMN12", IntegerType, true),
>  | StructField("COLUMN13", StringType, true),
>  | StructField("COLUMN14", StringType, true),
>  | StructField("COLUMN15", StringType, true),
>  | StructField("COLUMN16", StringType, true),
>  | StructField("COLUMN17", StringType, true),
>  | StructField("COLUMN18", StringType, true),
>  | StructField("COLUMN19", StringType, true),
>  | StructField("COLUMN20", StringType, true),
>  | StructField("COLUMN21", StringType, true),
>  | StructField("COLUMN22", StringType, true)))
> loandepoSchema: org.apache.spark.sql.types.StructType =
> StructType(StructField(COLUMN1,StringType,true),
> StructField(COLUMN2,StringType,true),
> StructField(COLUMN3,TimestampType,true),
> StructField(COLUMN4,TimestampType,true),
> StructField(COLUMN5,StringType,true), StructField(COLUMN6,StringType,true),
> StructField(COLUMN7,IntegerType,true),
> StructField(COLUMN8,IntegerType,true),
> StructField(COLUMN9,StringType,true),
> StructField(COLUMN10,IntegerType,true),
> StructField(COLUMN11,IntegerType,true),
> StructField(COLUMN12,IntegerType,true),
> StructField(COLUMN13,StringType,true),
> StructField(COLUMN14,StringType,true),
> StructField(COLUMN15,StringType,true),
> StructField(COLUMN16,StringType,true),
> StructField(COLUMN17,StringType,true),
> StructField(COLUMN18,StringType,true), StructField(COLUMN19,Strin...
> scala> val lonadepodf =
> hiveContext.read.format("com.databricks.spark.csv").option("header",
> "true").schema(loandepoSchema).load("/tmp/TestDivya/loandepo_10K.csv")
> 15/12/28 03:37:52 INFO HiveContext: Initializing HiveMetastoreConnection
> version 0.13.1 using Spark classes.
> lonadepodf: org.apache.spark.sql.DataFrame = [COLUMN1: string, COLUMN2:
> string, COLUMN3: timestamp, COLUMN4: timestamp, COLUMN5: string, COLUMN6:
> string, COLUMN7: int, COLUMN8: int, COLUMN9: string, COLUMN10: int,
> COLUMN11: int, COLUMN12: int, COLUMN13: string, COLUMN14: string, COLUMN15:
> string, COLUMN16: string, COLUMN17: string, COLUMN18: string, COLUMN19:
> string, COLUMN20: string, COLUMN21: string, COLUMN22: string]
>
> scala> lonadepodf.select("COLUMN1").show(10)
> 15/12/28 03:38:01 INFO MemoryStore: ensureFreeSpace(216384) called with
> curMem=0, maxMem=278302556
> 15/12/28 03:38:01 INFO MemoryStore: Block broadcast_0 stored as values in
> memory (estimated size 211.3 KB, free 265.2 MB)
>
> ...
> 15/12/28 03:38:07 INFO DAGScheduler: ResultStage 2 (show at :33)
> finished in 0.653 s
> 15/12/28 03:38:07 INFO YarnScheduler: Removed TaskSet 2.0, whose tasks
> have all completed, from pool
> 15/12/28 03:38:07 INFO DAGScheduler: Job 2 finished: show at :33,
> took 0.669388 s
> +---+
> |COLUMN1|
> +---+
> +---+
>
>
> scala> val loandepoSchema = StructType(Seq(
>  | StructField("COLUMN1", StringType, true),
>  | StructField("COLUMN2", StringType  , true),
>  | StructField("COLUMN3", StringType , true),
>  | StructField("COLUMN4", StringType , true),
>  | StructField("COLUMN5", StringType , true),
>  | StructField("COLUMN6", StringType, true),
>  | StructField("COLUMN7", StringType, true),
>  | StructField("COLUMN8", StringType, true),
>  | StructField("COLUMN9", StringType, true),
>  | 

Timestamp datatype in dataframe + Spark 1.4.1

2015-12-28 Thread Divya Gehlot
Hi,
I have input data set which is CSV file where I have date columns.
My output will also be CSV file and will using this output CSV  file as for
hive table creation.
I have few queries :
1.I tried using custom schema using Timestamp but it is returning empty
result set when querying the dataframes.
2.Can I use String datatype in Spark for date column and while creating
table can define it as date type ? Partitioning of my hive table will be
date column.

Would really  appreciate if you share some sample code for timestamp in
Dataframe whereas same can be used while creating the hive table.



Thanks,
Divya


returns empty result set when using TimestampType and NullType as StructType +DataFrame +Scala + Spark 1.4.1

2015-12-28 Thread Divya Gehlot
>
> SQL context available as sqlContext.
>
> scala> import org.apache.spark.sql.hive.HiveContext
> import org.apache.spark.sql.hive.HiveContext
>
> scala> import org.apache.spark.sql.hive.orc._
> import org.apache.spark.sql.hive.orc._
>
> scala> val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
> 15/12/28 03:34:57 WARN SparkConf: The configuration key
> 'spark.yarn.applicationMaster.waitTries' has been deprecated as of Spark
> 1.3 and and may be removed in the future. Please use the new key
> 'spark.yarn.am.waitTime' instead.
> 15/12/28 03:34:57 INFO HiveContext: Initializing execution hive, version
> 0.13.1
> hiveContext: org.apache.spark.sql.hive.HiveContext =
> org.apache.spark.sql.hive.HiveContext@3413fbe
>
> scala> import org.apache.spark.sql.types.{StructType, StructField,
> StringType, IntegerType,FloatType ,LongType ,TimestampType,NullType };
> import org.apache.spark.sql.types.{StructType, StructField, StringType,
> IntegerType, FloatType, LongType, TimestampType, NullType}
>
> scala> val loandepoSchema = StructType(Seq(
>  | StructField("COLUMN1", StringType, true),
>  | StructField("COLUMN2", StringType  , true),
>  | StructField("COLUMN3", TimestampType , true),
>  | StructField("COLUMN4", TimestampType , true),
>  | StructField("COLUMN5", StringType , true),
>  | StructField("COLUMN6", StringType, true),
>  | StructField("COLUMN7", IntegerType, true),
>  | StructField("COLUMN8", IntegerType, true),
>  | StructField("COLUMN9", StringType, true),
>  | StructField("COLUMN10", IntegerType, true),
>  | StructField("COLUMN11", IntegerType, true),
>  | StructField("COLUMN12", IntegerType, true),
>  | StructField("COLUMN13", StringType, true),
>  | StructField("COLUMN14", StringType, true),
>  | StructField("COLUMN15", StringType, true),
>  | StructField("COLUMN16", StringType, true),
>  | StructField("COLUMN17", StringType, true),
>  | StructField("COLUMN18", StringType, true),
>  | StructField("COLUMN19", StringType, true),
>  | StructField("COLUMN20", StringType, true),
>  | StructField("COLUMN21", StringType, true),
>  | StructField("COLUMN22", StringType, true)))
> loandepoSchema: org.apache.spark.sql.types.StructType =
> StructType(StructField(COLUMN1,StringType,true),
> StructField(COLUMN2,StringType,true),
> StructField(COLUMN3,TimestampType,true),
> StructField(COLUMN4,TimestampType,true),
> StructField(COLUMN5,StringType,true), StructField(COLUMN6,StringType,true),
> StructField(COLUMN7,IntegerType,true),
> StructField(COLUMN8,IntegerType,true),
> StructField(COLUMN9,StringType,true),
> StructField(COLUMN10,IntegerType,true),
> StructField(COLUMN11,IntegerType,true),
> StructField(COLUMN12,IntegerType,true),
> StructField(COLUMN13,StringType,true),
> StructField(COLUMN14,StringType,true),
> StructField(COLUMN15,StringType,true),
> StructField(COLUMN16,StringType,true),
> StructField(COLUMN17,StringType,true),
> StructField(COLUMN18,StringType,true), StructField(COLUMN19,Strin...
> scala> val lonadepodf =
> hiveContext.read.format("com.databricks.spark.csv").option("header",
> "true").schema(loandepoSchema).load("/tmp/TestDivya/loandepo_10K.csv")
> 15/12/28 03:37:52 INFO HiveContext: Initializing HiveMetastoreConnection
> version 0.13.1 using Spark classes.
> lonadepodf: org.apache.spark.sql.DataFrame = [COLUMN1: string, COLUMN2:
> string, COLUMN3: timestamp, COLUMN4: timestamp, COLUMN5: string, COLUMN6:
> string, COLUMN7: int, COLUMN8: int, COLUMN9: string, COLUMN10: int,
> COLUMN11: int, COLUMN12: int, COLUMN13: string, COLUMN14: string, COLUMN15:
> string, COLUMN16: string, COLUMN17: string, COLUMN18: string, COLUMN19:
> string, COLUMN20: string, COLUMN21: string, COLUMN22: string]
>
> scala> lonadepodf.select("COLUMN1").show(10)
> 15/12/28 03:38:01 INFO MemoryStore: ensureFreeSpace(216384) called with
> curMem=0, maxMem=278302556
> 15/12/28 03:38:01 INFO MemoryStore: Block broadcast_0 stored as values in
> memory (estimated size 211.3 KB, free 265.2 MB)
>
> ...
> 15/12/28 03:38:07 INFO DAGScheduler: ResultStage 2 (show at :33)
> finished in 0.653 s
> 15/12/28 03:38:07 INFO YarnScheduler: Removed TaskSet 2.0, whose tasks
> have all completed, from pool
> 15/12/28 03:38:07 INFO DAGScheduler: Job 2 finished: show at :33,
> took 0.669388 s
> +---+
> |COLUMN1|
> +---+
> +---+
>
>
> scala> val loandepoSchema = StructType(Seq(
>  | StructField("COLUMN1", StringType, true),
>  | StructField("COLUMN2", StringType  , true),
>  | StructField("COLUMN3", StringType , true),
>  | StructField("COLUMN4", StringType , true),
>  | StructField("COLUMN5", StringType , true),
>  | StructField("COLUMN6", StringType, true),
>  | StructField("COLUMN7", StringType, true),
>  | StructField("COLUMN8", StringType, true),
>  | StructField("COLUMN9", StringType, true),
>  | 

Re: DataFrame Save is writing just column names while saving

2015-12-27 Thread Divya Gehlot
Finally able to resolve the issue
For sample example having small dataset , its creating some 200 files .. I
was just doing the random file check in output directory and Alas ! was
getting all column files
Attaching the output files now ..
Now another question arises why so many (200 output files) are getting
created just for small data set
Attaching the dataset files too.

On 28 December 2015 at 13:29, Divya Gehlot <divya.htco...@gmail.com> wrote:

> yes
> Sharing the execution flow
>
> 15/12/28 00:19:15 INFO SessionState: No Tez session required at this
> point. hive.execution.engine=mr.
> 15/12/28 00:19:15 INFO SparkILoop: Created sql context (with Hive
> support)..
> SQL context available as sqlContext.
>
> scala> import org.apache.spark.sql.hive.HiveContext
> import org.apache.spark.sql.hive.HiveContext
>
> scala> import org.apache.spark.sql.hive.orc._
> import org.apache.spark.sql.hive.orc._
>
> scala> val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
> 15/12/28 00:20:15 WARN SparkConf: The configuration key
> 'spark.yarn.applicationMaster.waitTries' has been deprecated as of Spark
> 1.3 and and may be removed in the future. Please use the new key
> 'spark.yarn.am.waitTime' instead.
> 15/12/28 00:20:15 INFO HiveContext: Initializing execution hive, version
> 0.13.1
> hiveContext: org.apache.spark.sql.hive.HiveContext =
> org.apache.spark.sql.hive.HiveContext@9046f81
>
> scala> import org.apache.spark.sql.types.{StructType, StructField,
> StringType, IntegerType,FloatType ,LongType ,TimestampType };
> import org.apache.spark.sql.types.{StructType, StructField, StringType,
> IntegerType, FloatType, LongType, TimestampType}
>
> scala> val carsSchema = StructType(Seq(StructField("year", IntegerType,
> true),StructField("make", StringType, true),StructField("model",
> StringType, true),StructField("comment", StringType,
> true),StructField("blank", StringType, true)))
> carsSchema: org.apache.spark.sql.types.StructType =
> StructType(StructField(year,IntegerType,true),
> StructField(make,StringType,true), StructField(model,StringType,true),
> StructField(comment,StringType,true), StructField(blank,StringType,true))
>
> scala> val carsdf =
> hiveContext.read.format("com.databricks.spark.csv").option("header",
> "true").schema(carsSchema).load("/tmp/TestDivya/cars.csv")
> 15/12/28 00:20:45 INFO HiveContext: Initializing HiveMetastoreConnection
> version 0.13.1 using Spark classes.
> carsdf: org.apache.spark.sql.DataFrame = [year: int, make: string, model:
> string, comment: string, blank: string]
>
> scala> val carUsersSchema = StructType(Seq(StructField("Name", StringType,
> true),StructField("Car_Model", StringType  , true)))
> carUsersSchema: org.apache.spark.sql.types.StructType =
> StructType(StructField(Name,StringType,true),
> StructField(Car_Model,StringType,true))
>
> scala> val carUsersdf =
> hiveContext.read.format("com.databricks.spark.csv").option("header",
> "false").schema(carUsersSchema).load("/tmp/TestDivya/CarUsers.csv")
> carUsersdf: org.apache.spark.sql.DataFrame = [Name: string, Car_Model:
> string]
>
> scala> val joineddf = (carsdf.join(carUsersdf, carsdf("model") ===
> carUsersdf("Car_Model"))).select(carUsersdf("Name"),carsdf("make"),carUsersdf("Car_Model"))
> joineddf: org.apache.spark.sql.DataFrame = [Name: string, make: string,
> Car_Model: string]
>
> scala> joineddf.collect.foreach(println)
> 
> ..
>
> 15/12/28 00:21:35 INFO DAGScheduler: ResultStage 3 (collect at
> :39) finished in 2.261 s
> 15/12/28 00:21:35 INFO YarnScheduler: Removed TaskSet 3.0, whose tasks
> have all completed, from pool
> 15/12/28 00:21:35 INFO DAGScheduler: Job 1 finished: collect at
> :39, took 5.323441 s
> [Name3,Chevy,Volt]
> [Name6,Chevy,Volt]
> [Name1,Tesla,S]
> [Name4,Tesla,S]
> [Name2,Ford,E350]
> [Name5,Ford,E350]
>
> scala>
>
>
> scala> joineddf.write.format("com.databricks.spark.csv").option("header",
> "true").save("/tmp/TestDivya/CarUserData.csv")
> 15/12/28 00:25:31 INFO Exchange: Using SparkSqlSerializer2.
> 15/12/28 00:25:31 INFO Exchange: Using SparkSqlSerializer2.
> ..
> ..........
> 15/12/28 00:25:40 INFO YarnScheduler: Removed TaskSet 6.0, whose tasks
> have all completed, from pool
> 15/12/28 00:25:40 INFO DAG

[Spakr1.4.1] StuctField for date column in CSV file while creating custom schema

2015-12-28 Thread Divya Gehlot
Hi,
I am newbee to Spark ,
My appologies for such a naive question
I am using Spark 1.4.1 and wrtiting code in scala . I have input data as
CSVfile  which I am parsing using spark-csv package . I am creating custom
schema to process the CSV file .
Now my query is which dataype or can say  Structfield should I use for Date
column of my CSV file.
I am using hivecontext and have requirement to create hive table after
processing the CSV file.
For example my date columnin CSV file  looks like

25/11/2014 20/9/2015 25/10/2015 31/10/2012 25/9/2013 25/11/2012 20/10/2013
25/10/2011


map spark.driver.appUIAddress IP to different IP

2015-12-28 Thread Divya Gehlot
Hi,

I have HDP2.3.2 cluster installed in Amazon EC2.

I want to update the IP adress of spark.driver.appUIAddress,which is
currently mapped to private IP of EC2.

Searched in spark config in ambari,could find spark.driver.appUIAddress
property.

Because of this private IP mapping,the spark webUI page is not getting
displayed

Would really appreciate the help.

Thanks,

Divya


configure spark for hive context

2015-12-21 Thread Divya Gehlot
Hi,
I am trying to configure spark for hive context  (Please dont get mistaken
with hive on spark )
I placed hive-site.xml in spark/CONF_DIR
Now when I run spark-shell I am getting below error
Version which I am using




*Hadoop 2.6.2  Spark 1.5.2   Hive 1.2.1 *


Welcome to
>     __
>  / __/__  ___ _/ /__
> _\ \/ _ \/ _ `/ __/  '_/
>/___/ .__/\_,_/_/ /_/\_\   version 1.5.2
>   /_/
>
> Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java
> 1.8.0_66)
> Type in expressions to have them evaluated.
> Type :help for more information.
> Spark context available as sc.
> java.lang.RuntimeException: java.lang.IllegalArgumentException:
> java.net.URISyntaxException: Relative path in absolute URI:
> ${system:java.io.tmpdir%7D/$%7Bsystem:user.name%7D
> at
> org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:522)
> at
> org.apache.spark.sql.hive.client.ClientWrapper.(ClientWrapper.scala:171)
> at
> org.apache.spark.sql.hive.HiveContext.executionHive$lzycompute(HiveContext.scala:162)
> at
> org.apache.spark.sql.hive.HiveContext.executionHive(HiveContext.scala:160)
> at org.apache.spark.sql.hive.HiveContext.(HiveContext.scala:167)
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
> Method)
> at
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> at
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:422)
> at
> org.apache.spark.repl.SparkILoop.createSQLContext(SparkILoop.scala:1028)
> at $iwC$$iwC.(:9)
> at $iwC.(:18)
> at (:20)
> at .(:24)
> at .()
> at .(:7)
> at .()
> at $print()
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:497)
> at
> org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
> at
> org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1340)
> at
> org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
> at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
> at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
> at
> org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:857)
> at
> org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902)
> at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814)
> at
> org.apache.spark.repl.SparkILoopInit$$anonfun$initializeSpark$1.apply(SparkILoopInit.scala:132)
> at
> org.apache.spark.repl.SparkILoopInit$$anonfun$initializeSpark$1.apply(SparkILoopInit.scala:124)
> at org.apache.spark.repl.SparkIMain.beQuietDuring(SparkIMain.scala:324)
> at
> org.apache.spark.repl.SparkILoopInit$class.initializeSpark(SparkILoopInit.scala:124)
> at
> org.apache.spark.repl.SparkILoop.initializeSpark(SparkILoop.scala:64)
> at
> org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1$$anonfun$apply$mcZ$sp$5.apply$mcV$sp(SparkILoop.scala:974)
> at
> org.apache.spark.repl.SparkILoopInit$class.runThunks(SparkILoopInit.scala:159)
> at org.apache.spark.repl.SparkILoop.runThunks(SparkILoop.scala:64)
> at
> org.apache.spark.repl.SparkILoopInit$class.postInitialization(SparkILoopInit.scala:108)
> at
> org.apache.spark.repl.SparkILoop.postInitialization(SparkILoop.scala:64)
> at
> org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:991)
> at
> org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
> at
> org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
> at
> scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
> at org.apache.spark.repl.SparkILoop.org
> $apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945)
> at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1059)
> at org.apache.spark.repl.Main$.main(Main.scala:31)
> at org.apache.spark.repl.Main.main(Main.scala)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:497)
> at
> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:674)
> at
> 

error while defining custom schema in Spark 1.5.0

2015-12-22 Thread Divya Gehlot
Hi,
I am new bee to Apache Spark ,using  CDH 5.5 Quick start VM.having spark
1.5.0.
I working on custom schema and getting error

import org.apache.spark.sql.hive.HiveContext
>>
>> scala> import org.apache.spark.sql.hive.orc._
>> import org.apache.spark.sql.hive.orc._
>>
>> scala> import org.apache.spark.sql.types.{StructType, StructField,
>> StringType, IntegerType};
>> import org.apache.spark.sql.types.{StructType, StructField, StringType,
>> IntegerType}
>>
>> scala> val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
>> 15/12/21 23:41:53 INFO hive.HiveContext: Initializing execution hive,
>> version 1.1.0
>> 15/12/21 23:41:53 INFO client.ClientWrapper: Inspected Hadoop version:
>> 2.6.0-cdh5.5.0
>> 15/12/21 23:41:53 INFO client.ClientWrapper: Loaded
>> org.apache.hadoop.hive.shims.Hadoop23Shims for Hadoop version 2.6.0-cdh5.5.0
>> hiveContext: org.apache.spark.sql.hive.HiveContext =
>> org.apache.spark.sql.hive.HiveContext@214bd538
>>
>> scala> val customSchema = StructType(Seq(StructField("year", IntegerType,
>> true),StructField("make", StringType, true),StructField("model",
>> StringType, true),StructField("comment", StringType,
>> true),StructField("blank", StringType, true)))
>> customSchema: org.apache.spark.sql.types.StructType =
>> StructType(StructField(year,IntegerType,true),
>> StructField(make,StringType,true), StructField(model,StringType,true),
>> StructField(comment,StringType,true), StructField(blank,StringType,true))
>>
>> scala> val customSchema = (new StructType).add("year", IntegerType,
>> true).add("make", StringType, true).add("model", StringType,
>> true).add("comment", StringType, true).add("blank", StringType, true)
>> customSchema: org.apache.spark.sql.types.StructType =
>> StructType(StructField(year,IntegerType,true),
>> StructField(make,StringType,true), StructField(model,StringType,true),
>> StructField(comment,StringType,true), StructField(blank,StringType,true))
>>
>> scala> val customSchema = StructType( StructField("year", IntegerType,
>> true) :: StructField("make", StringType, true) :: StructField("model",
>> StringType, true) :: StructField("comment", StringType, true) ::
>> StructField("blank", StringType, true)::StructField("blank", StringType,
>> true))
>> :24: error: value :: is not a member of
>> org.apache.spark.sql.types.StructField
>>val customSchema = StructType( StructField("year", IntegerType,
>> true) :: StructField("make", StringType, true) :: StructField("model",
>> StringType, true) :: StructField("comment", StringType, true) ::
>> StructField("blank", StringType, true)::StructField("blank", StringType,
>> true))
>>
>
Tried like like below also

scala> val customSchema = StructType( StructField("year", IntegerType,
true), StructField("make", StringType, true) ,StructField("model",
StringType, true) , StructField("comment", StringType, true) ,
StructField("blank", StringType, true),StructField("blank", StringType,
true))
:24: error: overloaded method value apply with alternatives:
  (fields:
Array[org.apache.spark.sql.types.StructField])org.apache.spark.sql.types.StructType

  (fields:
java.util.List[org.apache.spark.sql.types.StructField])org.apache.spark.sql.types.StructType

  (fields:
Seq[org.apache.spark.sql.types.StructField])org.apache.spark.sql.types.StructType
 cannot be applied to (org.apache.spark.sql.types.StructField,
org.apache.spark.sql.types.StructField,
org.apache.spark.sql.types.StructField,
org.apache.spark.sql.types.StructField,
org.apache.spark.sql.types.StructField,
org.apache.spark.sql.types.StructField)
   val customSchema = StructType( StructField("year", IntegerType,
true), StructField("make", StringType, true) ,StructField("model",
StringType, true) , StructField("comment", StringType, true) ,
StructField("blank", StringType, true),StructField("blank", StringType,
true))
  ^
   Would really appreciate if somebody share the example which works with
Spark 1.4 or Spark 1.5.0

Thanks,
Divya

^


queries on Spork (Pig on Spark)

2015-11-24 Thread Divya Gehlot
>
> Hi,


As a beginner ,I have below queries on Spork(Pig on Spark).
I have cloned  git clone https://github.com/apache/pig -b spark .
1.On which version of Pig and Spark , Spork  is being built ?
2. I followed the steps mentioned in   https://issues.apache.org/ji
ra/browse/PIG-4059 and try to run simple pig script just like Load the file
and dump/store it.
Getting errors :

>
grunt> A = load '/tmp/words_tb.txt' using PigStorage('\t') as
(empNo:chararray,empName:chararray,salary:chararray);
grunt> Store A into
'/tmp/spork';

2015-11-25 05:35:52,502 [main] INFO
org.apache.pig.tools.pigstats.ScriptState - Pig features used in the
script: UNKNOWN
2015-11-25 05:35:52,875 [main] WARN  org.apache.pig.data.SchemaTupleBackend
- SchemaTupleBackend has already been initialized
2015-11-25 05:35:52,883 [main] INFO
org.apache.pig.newplan.logical.optimizer.LogicalPlanOptimizer - Not MR
mode. RollupHIIOptimizer is disabled
2015-11-25 05:35:52,894 [main] INFO
org.apache.pig.newplan.logical.optimizer.LogicalPlanOptimizer -
{RULES_ENABLED=[AddForEach, ColumnMapKeyPrune, ConstantCalculator,
GroupByConstParallelSetter, LimitOptimizer, LoadTypeCastInserter,
MergeFilter, MergeForEach, PartitionFilterOptimizer,
PredicatePushdownOptimizer, PushDownForEachFlatten, PushUpFilter,
SplitFilter, StreamTypeCastInserter]}
2015-11-25 05:35:52,966 [main] INFO  org.apache.pig.data.SchemaTupleBackend
- Key [pig.schematuple] was not set... will not generate code.
2015-11-25 05:35:52,983 [main] INFO
org.apache.pig.backend.hadoop.executionengine.spark.SparkLauncher - add
Files Spark Job
2015-11-25 05:35:53,137 [main] INFO
org.apache.pig.backend.hadoop.executionengine.spark.SparkLauncher - Added
jar pig-0.15.0-SNAPSHOT-core-h2.jar
2015-11-25 05:35:53,138 [main] INFO
org.apache.pig.backend.hadoop.executionengine.spark.SparkLauncher - Added
jar pig-0.15.0-SNAPSHOT-core-h2.jar
2015-11-25 05:35:53,138 [main] INFO
org.apache.pig.backend.hadoop.executionengine.spark.SparkLauncher -
Converting operator POLoad (Name: A:
Load(/tmp/words_tb.txt:PigStorage(' ')) - scope-29 Operator Key: scope-29)
2015-11-25 05:35:53,205 [main] ERROR org.apache.pig.tools.grunt.Grunt -
ERROR 2998: Unhandled internal error. Could not initialize class
org.apache.spark.rdd.RDDOperationScope$
Details at logfile: /home/pig/pig_1448425672112.log


Can you please help me in pointing whats wrong ?

Appreciate your help .

Thanks,

Regards,

Divya


Re: queries on Spork (Pig on Spark)

2015-11-24 Thread Divya Gehlot
Log files content :
Pig Stack Trace
---
ERROR 2998: Unhandled internal error. Could not initialize class
org.apache.spark.rdd.RDDOperationScope$
java.lang.NoClassDefFoundError: Could not initialize class
org.apache.spark.rdd.RDDOperationScope$
 at org.apache.spark.SparkContext.withScope(SparkContext.scala:681)
 at org.apache.spark.SparkContext.newAPIHadoopRDD(SparkContext.scala:1094)
 at
org.apache.pig.backend.hadoop.executionengine.spark.converter.LoadConverter.convert(LoadConverter.java:91)
 at
org.apache.pig.backend.hadoop.executionengine.spark.converter.LoadConverter.convert(LoadConverter.java:61)
 at
org.apache.pig.backend.hadoop.executionengine.spark.SparkLauncher.physicalToRDD(SparkLauncher.java:666)
 at
org.apache.pig.backend.hadoop.executionengine.spark.SparkLauncher.physicalToRDD(SparkLauncher.java:633)
 at
org.apache.pig.backend.hadoop.executionengine.spark.SparkLauncher.physicalToRDD(SparkLauncher.java:633)
 at
org.apache.pig.backend.hadoop.executionengine.spark.SparkLauncher.sparkOperToRDD(SparkLauncher.java:585)
 at
org.apache.pig.backend.hadoop.executionengine.spark.SparkLauncher.sparkPlanToRDD(SparkLauncher.java:534)
 at
org.apache.pig.backend.hadoop.executionengine.spark.SparkLauncher.launchPig(SparkLauncher.java:209)
 at
org.apache.pig.backend.hadoop.executionengine.HExecutionEngine.launchPig(HExecutionEngine.java:301)
 at org.apache.pig.PigServer.launchPlan(PigServer.java:1390)
 at org.apache.pig.PigServer.executeCompiledLogicalPlan(PigServer.java:1375)
 at org.apache.pig.PigServer.storeEx(PigServer.java:1034)
 at org.apache.pig.PigServer.store(PigServer.java:997)
 at org.apache.pig.PigServer.openIterator(PigServer.java:910)
 at org.apache.pig.tools.grunt.GruntParser.processDump(GruntParser.java:754)
 at
org.apache.pig.tools.pigscript.parser.PigScriptParser.parse(PigScriptParser.java:376)
 at
org.apache.pig.tools.grunt.GruntParser.parseStopOnError(GruntParser.java:230)
 at
org.apache.pig.tools.grunt.GruntParser.parseStopOnError(GruntParser.java:205)
 at org.apache.pig.tools.grunt.Grunt.run(Grunt.java:66)
 at org.apache.pig.Main.run(Main.java:558)
 at org.apache.pig.Main.main(Main.java:170)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)
 at org.apache.hadoop.util.RunJar.run(RunJar.java:221)
 at org.apache.hadoop.util.RunJar.main(RunJar.java:136)


Didn't understand the problem behind the error .

Thanks,
Regards,
Divya

On 25 November 2015 at 14:00, Jeff Zhang <zjf...@gmail.com> wrote:

> >>> Details at logfile: /home/pig/pig_1448425672112.log
>
> You need to check the log file for details
>
>
>
>
> On Wed, Nov 25, 2015 at 1:57 PM, Divya Gehlot <divya.htco...@gmail.com>
> wrote:
>
>> Hi,
>>
>>
>> As a beginner ,I have below queries on Spork(Pig on Spark).
>> I have cloned  git clone https://github.com/apache/pig -b spark .
>> 1.On which version of Pig and Spark , Spork  is being built ?
>> 2. I followed the steps mentioned in   https://issues.apache.org/ji
>> ra/browse/PIG-4059 and try to run simple pig script just like Load the
>> file and dump/store it.
>> Getting errors :
>>
>>>
>> grunt> A = load '/tmp/words_tb.txt' using PigStorage('\t') as
>> (empNo:chararray,empName:chararray,salary:chararray);
>> grunt> Store A into
>> '/tmp/spork';
>>
>> 2015-11-25 05:35:52,502 [main] INFO
>> org.apache.pig.tools.pigstats.ScriptState - Pig features used in the
>> script: UNKNOWN
>> 2015-11-25 05:35:52,875 [main] WARN
>> org.apache.pig.data.SchemaTupleBackend - SchemaTupleBackend has already
>> been initialized
>> 2015-11-25 05:35:52,883 [main] INFO
>> org.apache.pig.newplan.logical.optimizer.LogicalPlanOptimizer - Not MR
>> mode. RollupHIIOptimizer is disabled
>> 2015-11-25 05:35:52,894 [main] INFO
>> org.apache.pig.newplan.logical.optimizer.LogicalPlanOptimizer -
>> {RULES_ENABLED=[AddForEach, ColumnMapKeyPrune, ConstantCalculator,
>> GroupByConstParallelSetter, LimitOptimizer, LoadTypeCastInserter,
>> MergeFilter, MergeForEach, PartitionFilterOptimizer,
>> PredicatePushdownOptimizer, PushDownForEachFlatten, PushUpFilter,
>> SplitFilter, StreamTypeCastInserter]}
>> 2015-11-25 05:35:52,966 [main] INFO
>> org.apache.pig.data.SchemaTupleBackend - Key [pig.schematuple] was not
>> set... will not generate code.
>> 2015-11-25 05:35:52,983 [main] INFO
>> org.apache.pig.backend.hadoop.executionengine.spark.SparkLauncher - add
>> Files Spark Job
>> 2015-11-25 05:35:53,137 [main] INFO
>> 

pass one dataframe column value to another dataframe filter expression + Spark 1.5 + scala

2016-02-04 Thread Divya Gehlot
Hi,
I have two input datasets
First input dataset like as below :

year,make,model,comment,blank
> "2012","Tesla","S","No comment",
> 1997,Ford,E350,"Go get one now they are going fast",
> 2015,Chevy,Volt


Second Input dataset :

TagId,condition
> 1997_cars,year = 1997 and model = 'E350'
> 2012_cars,year=2012 and model ='S'
> 2015_cars ,year=2015 and model = 'Volt'


Now my requirement is read first data set and based on the filtering
condition in second dataset need to tag rows of first input dataset by
introducing a new column TagId to first input data set
so the expected should look like :

year,make,model,comment,blank,TagId
> "2012","Tesla","S","No comment",2012_cars
> 1997,Ford,E350,"Go get one now they are going fast",1997_cars
> 2015,Chevy,Volt, ,2015_cars


I tried like :

val sqlContext = new SQLContext(sc)
> val carsSchema = StructType(Seq(
> StructField("year", IntegerType, true),
> StructField("make", StringType, true),
> StructField("model", StringType, true),
> StructField("comment", StringType, true),
> StructField("blank", StringType, true)))
>
> val carTagsSchema = StructType(Seq(
> StructField("TagId", StringType, true),
> StructField("condition", StringType, true)))
>
>
> val dfcars =
> sqlContext.read.format("com.databricks.spark.csv").option("header", "true")
> .schema(carsSchema).load("/TestDivya/Spark/cars.csv")
> val dftags =
> sqlContext.read.format("com.databricks.spark.csv").option("header", "true")
> .schema(carTagsSchema).load("/TestDivya/Spark/CarTags.csv")
>
> val Amendeddf = dfcars.withColumn("TagId", dfcars("blank"))
> val cdtnval = dftags.select("condition")
> val df2=dfcars.filter(cdtnval)
> :35: error: overloaded method value filter with alternatives:
>   (conditionExpr: String)org.apache.spark.sql.DataFrame 
>   (condition:
> org.apache.spark.sql.Column)org.apache.spark.sql.DataFrame
>  cannot be applied to (org.apache.spark.sql.DataFrame)
>val df2=dfcars.filter(cdtnval)


another way :

val col = dftags.col("TagId")
> val finaldf = dfcars.withColumn("TagId", col)
> org.apache.spark.sql.AnalysisException: resolved attribute(s) TagId#5
> missing from comment#3,blank#4,model#2,make#1,year#0 in operator !Project
> [year#0,make#1,model#2,comment#3,blank#4,TagId#5 AS TagId#8];
>
> finaldf.write.format("com.databricks.spark.csv").option("header",
> "true").save("/TestDivya/Spark/carswithtags.csv")



Would really appreciate if somebody give me pointers how can I pass the
filter condition(second dataframe) to filter function of first dataframe.
Or another solution .
My apppologies for such a naive question as I am new to scala and Spark

Thanks


add new column in the schema + Dataframe

2016-02-04 Thread Divya Gehlot
Hi,
I am beginner in spark and using Spark 1.5.2 on YARN.(HDP2.3.4)
I have a use case where I have to read two input files and based on certain
 conditions in second input file ,have to add a new column in the first
input file and save it .

I am using spark-csv to read my input files .
Would really appreciate if somebody would share their thoughts on
best/feasible way of doing it(using dataframe API)


Thanks,
Divya


Passing a dataframe to where clause + Spark SQL

2016-02-10 Thread Divya Gehlot
Hi,
//Loading all the DB Properties
val options1 = Map("url" ->
"jdbc:oracle:thin:@xx.xxx.xxx.xx:1521:dbname","user"->"username","password"->"password","dbtable"
-> "TESTCONDITIONS")
val testCond  = sqlContext.load("jdbc",options1 )
val condval = testCond.select("Cond")

testCond.show()
val options2 = Map("url" ->
"jdbc:oracle:thin:@xx.xxx.xxx.xx:1521:dbanme","user"->"username","password"->"password","dbtable"
-> "Test")
val test= sqlContext.load("jdbc",options2 )
test.select.where(condval ) //gives error as cannot convert sql.Column to
Dataframe

test.select().where(???)

My TestConditions table has only one row
which looks like year = 1965 and month = ;december'

Can I convert sql.Column to list and pass ?
I am new Spark and scala.


Will really appreciate the help.

Thanks,
Divya


Spark : Unable to connect to Oracle

2016-02-10 Thread Divya Gehlot
Hi,
I am new bee to Spark and using Spark 1.5.2 version.
I am trying to connect to Oracle DB using Spark API,getting errors :
Steps I followed :
Step 1- I placed the ojdbc6.jar in
/usr/hdp/2.3.4.0-3485/spark/lib/ojdbc6.jar
Step 2- Registered the jar file
sc.addJar("/usr/hdp/2.3.4.0-3485/spark/lib/ojdbc6.jar")
16/02/10 04:27:55 INFO SparkContext: Added JAR
/usr/hdp/2.3.4.0-3485/spark/lib/ojdbc6.jar at
http://xxx.xx.xx.xx:37460/jars/ojdbc6.jar with timestamp 1455096475089

Step 3 :
val loanDepo = sqlContext.load("jdbc", Map("driver" ->
"oracle.jdbc.driver.OracleDriver","url" ->
"jdbc:oracle:thin:@xx.xxx.xxx.xx:1521:dbanme","dbtable"
-> "testtable","user"->"username", "password"->"password"))

> warning: there were 1 deprecation warning(s); re-run with -deprecation for
> details
> java.lang.ClassNotFoundException: oracle.jdbc.driver.OracleDriver
> at
> scala.tools.nsc.interpreter.AbstractFileClassLoader.findClass(AbstractFileClassLoader.scala:83)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
> at
> org.apache.spark.sql.execution.datasources.jdbc.DriverRegistry$.register(DriverRegistry.scala:38)
> at
> org.apache.spark.sql.execution.datasources.jdbc.DefaultSource.createRelation(DefaultSource.scala:41)
> at
> org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:125)
> at org.apache.spark.sql.DataFrameReader.load(Dat



would really appreciate if somebody point me the errors with above
steps/pointers to correct steps.


Thanks,
Divya


how to calculate -- executor-memory,num-executors,total-executor-cores

2016-02-02 Thread Divya Gehlot
Hi,

I would like to know how to calculate how much  -executor-memory should we
allocate , how many num-executors,total-executor-cores we should give while
submitting spark jobs .
Is there any formula for it ?


Thanks,
Divya


Re: Dynamic sql in Spark 1.5

2016-02-02 Thread Divya Gehlot
Hi,
I have data set like :
Dataset 1
HeaderCol1 HeadCol2 HeadCol3
 dataset 1 dataset2 dataset 3
dataset 11 dataset13 dataset 13
dataset 21 dataset22 dataset 23

Datset 2
HeadColumn1 HeadColumn2HeadColumn3 HeadColumn4
Tag1  Dataset1
Tag2  Dataset1   Dataset2
Tag3  Dataset1  Dataset2   Dataset3
Tag4 DifferentDataset1
Tag5 DifferentDataset1   DifferentDataset2
Tag6 DifferentDataset1DifferentDataset2
DifferentDataset3


My requirement is to tag dataset(adding one more column) based on dataset 1


Can I do implement it in spark.
In RDBMS we have implemented using dynamic sql.

Would really appreciate the help.


Thanks,
Divya





On 3 February 2016 at 11:42, Ali Tajeldin EDU <alitedu1...@gmail.com> wrote:

> While you can construct the SQL string dynamically in scala/java/python,
> it would be best to use the Dataframe API for creating dynamic SQL
> queries.  See
> http://spark.apache.org/docs/1.5.2/sql-programming-guide.html for details.
>
> On Feb 2, 2016, at 6:49 PM, Divya Gehlot <divya.htco...@gmail.com> wrote:
>
> Hi,
> Does Spark supports dyamic sql ?
> Would really appreciate the help , if any one could share some
> references/examples.
>
>
>
> Thanks,
> Divya
>
>
>


Dynamic sql in Spark 1.5

2016-02-02 Thread Divya Gehlot
Hi,
Does Spark supports dyamic sql ?
Would really appreciate the help , if any one could share some
references/examples.



Thanks,
Divya


[Query] : How to read null values in Spark 1.5.2

2016-02-24 Thread Divya Gehlot
Hi,
I have a data set(source is data -> database) which has null values .
When I am defining the custom schema as any type except string type,
I get number format exception on null values .
Has anybody come across this kind of scenario?
Would really appreciate if you can share your resolution or workaround.

Thanks,
Divya


[Vote] : Spark-csv 1.3 + Spark 1.5.2 - Error parsing null values except String data type

2016-02-23 Thread Divya Gehlot
Hi,

Please vote if you have ever faced this issue.
I am getting error when parsing null values with Spark-csv
DataFile :
name age
alice 35
bob null
peter 24
Code :
 spark-shell  --packages com.databricks:spark-csv_2.10:1.3.0  --master
yarn-client -i /TestDivya/Spark/Testnull.scala

Testnull.scala

> import org.apache.spark.sql.types.{StructType, StructField,NullType,
> DateType,, IntegerType,, LongType,DoubleType, FloatType, StringType,};
> import java.util.Properties
> import org.apache.spark._
> import org.apache.spark.sql._
>
> val testnullSchema = StructType(List(
> StructField("name", StringType, false),
>  StructField("age", IntegerType, true)))
> val dfreadnull =
> sqlContext.read.format("com.databricks.spark.csv").option("header",
> "true").option("nullValue","").option("treatEmptyValuesAsNulls","true").schema(testnullSchema).load("hdfs://
> 172.31.29.201:8020/TestDivya/Spark/nulltest1.csv")



 Has anybody faced similar issue reading csv file which has null values in
fields apart from String datatype .

Googled it and found the issue is open Spark-csv Github Repo


Thanks,
Divya


[Help]: DataframeNAfunction fill method throwing exception

2016-02-25 Thread Divya Gehlot
Hi,
I have dataset which looks like below
name age
alice 35
bob null
peter 24
I need to replace null values of columns with 0
so  I referred Spark API DataframeNAfunctions.scala


 I tried the below code its throwing exception
scala> import org.apache.spark.sql.types.{StructType, StructField,
StringType,IntegerType,LongType,DoubleType, FloatType};
import org.apache.spark.sql.types.{StructType, StructField, StringType,
IntegerType, LongType, DoubleType, FloatType}

scala> val nulltestSchema = StructType(Seq(StructField("name", StringType,
false),StructField("age", DoubleType, true)))
nulltestSchema: org.apache.spark.sql.types.StructType =
StructType(StructField(name,StringType,false),
StructField(age,DoubleType,true))

scala> val dfnulltest =
sqlContext.read.format("com.databricks.spark.csv").option("header",
"true").schema(nulltestSchema).load("hdfs://
172.31.29.201:8020/TestDivya/Spark/nulltest.csv")
dfnulltest: org.apache.spark.sql.DataFrame = [name: string, age: double]

scala> val dfchangenull =
dfnulltest.na.fill(0,Seq("age")).select("name","age")
dfchangenull: org.apache.spark.sql.DataFrame = [name: string, age: double]

scala> dfchangenull.show
16/02/25 23:15:59 WARN TaskSetManager: Lost task 0.0 in stage 2.0 (TID 2,
ip-172-31-22-135.ap-southeast-1.compute.internal):
java.text.ParseException: Unparseable number: "null"
at java.text.NumberFormat.parse(NumberFormat.java:350)


Re: [Help]: DataframeNAfunction fill method throwing exception

2016-02-25 Thread Divya Gehlot
Hi Jan ,
Thanks for help.
Alas..
you suggestion also didnt work

scala> import org.apache.spark.sql.types.{StructType, StructField,
> StringType,IntegerType,LongType,DoubleType, FloatType};
> import org.apache.spark.sql.types.{StructType, StructField, StringType,
> IntegerType, LongType, DoubleType, FloatType}
> scala> val nulltestSchema = StructType(Seq(StructField("name", StringType,
> false),StructField("age", DoubleType, true)))
> nulltestSchema: org.apache.spark.sql.types.StructType =
> StructType(StructField(name,StringType,false),
> StructField(age,DoubleType,true))
> scala> val dfnulltest =
> sqlContext.read.format("com.databricks.spark.csv").option("header",
> "true").schema(nulltestSchema).load("hdfs://xx.xx.xx.xxx:8020/TestDivya/Spark/nulltest.csv")
> dfnulltest: org.apache.spark.sql.DataFrame = [name: string, age: double]
> scala> dfnulltest.selectExpr("name", "coalesce(age, 0) as age")
> res0: org.apache.spark.sql.DataFrame = [name: string, age: double]
> scala> val dfresult = dfnulltest.selectExpr("name", "coalesce(age, 0) as
> age")
> dfresult: org.apache.spark.sql.DataFrame = [name: string, age: double]
> scala> dfresult.show


 java.text.ParseException: Unparseable number: "null"
at java.text.NumberFormat.parse(NumberFormat.java:350)


On 26 February 2016 at 15:15, Jan Štěrba <i...@jansterba.com> wrote:

> just use coalesce function
>
> df.selectExpr("name", "coalesce(age, 0) as age")
>
> --
> Jan Sterba
> https://twitter.com/honzasterba | http://flickr.com/honzasterba |
> http://500px.com/honzasterba
>
> On Fri, Feb 26, 2016 at 5:27 AM, Divya Gehlot <divya.htco...@gmail.com>
> wrote:
>
>> Hi,
>> I have dataset which looks like below
>> name age
>> alice 35
>> bob null
>> peter 24
>> I need to replace null values of columns with 0
>> so  I referred Spark API DataframeNAfunctions.scala
>> <https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/DataFrameNaFunctions.scala>
>>
>>  I tried the below code its throwing exception
>> scala> import org.apache.spark.sql.types.{StructType, StructField,
>> StringType,IntegerType,LongType,DoubleType, FloatType};
>> import org.apache.spark.sql.types.{StructType, StructField, StringType,
>> IntegerType, LongType, DoubleType, FloatType}
>>
>> scala> val nulltestSchema = StructType(Seq(StructField("name",
>> StringType, false),StructField("age", DoubleType, true)))
>> nulltestSchema: org.apache.spark.sql.types.StructType =
>> StructType(StructField(name,StringType,false),
>> StructField(age,DoubleType,true))
>>
>> scala> val dfnulltest =
>> sqlContext.read.format("com.databricks.spark.csv").option("header",
>> "true").schema(nulltestSchema).load("hdfs://
>> 172.31.29.201:8020/TestDivya/Spark/nulltest.csv")
>> dfnulltest: org.apache.spark.sql.DataFrame = [name: string, age: double]
>>
>> scala> val dfchangenull =
>> dfnulltest.na.fill(0,Seq("age")).select("name","age")
>> dfchangenull: org.apache.spark.sql.DataFrame = [name: string, age: double]
>>
>> scala> dfchangenull.show
>> 16/02/25 23:15:59 WARN TaskSetManager: Lost task 0.0 in stage 2.0 (TID 2,
>> ip-172-31-22-135.ap-southeast-1.compute.internal):
>> java.text.ParseException: Unparseable number: "null"
>> at java.text.NumberFormat.parse(NumberFormat.java:350)
>>
>>
>
>


Re: which master option to view current running job in Spark UI

2016-02-24 Thread Divya Gehlot
Hi Jeff ,

The issues with EC2 logs view .
Had to set up SSH tunnels to view the current running job.


Thanks,
Divya

On 24 February 2016 at 10:33, Jeff Zhang <zjf...@gmail.com> wrote:

> View running job in SPARK UI doesn't matter which master you use.  What do
> you mean "I cant see the currently running jobs in Spark WEB UI" ? Do you
> see a blank spark ui or can't open the spark ui ?
>
> On Mon, Feb 15, 2016 at 12:55 PM, Sabarish Sasidharan <
> sabarish.sasidha...@manthan.com> wrote:
>
>> When running in YARN, you can use the YARN Resource Manager UI to get to
>> the ApplicationMaster url, irrespective of client or cluster mode.
>>
>> Regards
>> Sab
>> On 15-Feb-2016 10:10 am, "Divya Gehlot" <divya.htco...@gmail.com> wrote:
>>
>>> Hi,
>>> I have Hortonworks 2.3.4 cluster on EC2 and Have spark jobs as scala
>>> files .
>>> I am bit confused between using *master  *options
>>> I want to execute this spark job in YARN
>>>
>>> Curently running as
>>> spark-shell --properties-file  /TestDivya/Spark/Oracle.properties --jars
>>> /usr/hdp/2.3.4.0-3485/spark/lib/ojdbc6.jar --driver-class-path
>>> /usr/hdp/2.3.4.0-3485/spark/lib/ojdbc6.jar --packages
>>> com.databricks:spark-csv_2.10:1.1.0  *--master yarn-client *  -i
>>> /TestDivya/Spark/Test.scala
>>>
>>> with this option I cant see the currently running jobs in Spark WEB UI
>>> though it later appear in spark history server.
>>>
>>> My question with which --master option should I run my spark jobs so
>>> that I can view the currently running jobs in spark web UI .
>>>
>>> Thanks,
>>> Divya
>>>
>>
>
>
> --
> Best Regards
>
> Jeff Zhang
>


[Help]: Steps to access hive table + Spark 1.5.2 + HbaseIntegration + Hive 1.2 + Hbase 1.1

2016-02-29 Thread Divya Gehlot
Hi,

Can anybody help me by sharing the steps/examples
How to connect to hive table(which is being created using HbaseIntegration
 )
through hivecontext in Spark
I googled but couldnt find a single example/document .

Would really appreciate the help.



Thanks,
Divya


[Example] : Save dataframes with different schema + Spark 1.5.2 and Dataframe + Spark-CSV package

2016-02-22 Thread Divya Gehlot
Hi,
My usecase :
Have  two datsets1 like below :
year make model comment blank Carname
2012 Tesla S No comment
1997 Ford E350 Go get one
 now they are going fast MyFord
2015 Chevy Volt
2016 Mercedes

Datset2
carowner year make model
John 2012 Tesla S
David
Peter 1997 Ford E350
Paul 2015 Chevy Volt

My output should be like
carowner year make model comment blank Carname
John 2012 Tesla S No comment null
David null
Peter 1997 Ford E350 Go get one
 now they are going fast MyFord
Paul 2015 Chevy Volt null
null 2016 Mercedes null null null

How can I achieve this using dataframe and spark CSV packages .
Would really appreciate the help.


Thanks,
Divya


Group by Dynamically

2016-01-24 Thread Divya Gehlot
Hi,
I have two files
File1
Group by Condition
Field1   Y
Field 2   N
Field3 Y

File2 is data file having field1,field2,field3 etc..
field1 field2 field3 field4 field5
data1 data2 data3 data4 data 5
data11 data22 data33 data44 data 55

Now my requirement is to group by based on th conditions in file 1
For instance , as mentioned above if field1 and field3 is Y(true)
Need to group by based on only field 1 and field 3

Would really appreciate your help.

Thanks,
Divya


IllegalStateException : When use --executor-cores option in YARN

2016-02-14 Thread Divya Gehlot
Hi,

I am starting spark-shell with following options :
spark-shell --properties-file  /TestDivya/Spark/Oracle.properties --jars
/usr/hdp/2.3.4.0-3485/spark/lib/ojdbc6.jar --driver-class-path
/usr/hdp/2.3.4.0-3485/spark/lib/ojdbc6.jar --packages
com.databricks:spark-csv_2.10:1.1.0  --master yarn-client --num-executors
10 --executor-cores 4 -i /TestDivya/Spark/Test.scala

Got few queries :
1.Error :
java.lang.IllegalStateException: SparkContext has been shutdown

If I remove --executor-cores 4 .. It runs smoothly

2. with --num-executors 10 my spark job takes more time .
 May I know why ?

3. Whats the difference between spark-shell and spark-submit

I am new bee to Spark ..Apologies for such naive questions.
Just  trying to figure out how to tune spark jobs to increase performance
on Hadoop cluster on EC2.
If anybody has real time experience ,please help me.


Thanks,
Divya


Difference between spark-shell and spark-submit.Which one to use when ?

2016-02-14 Thread Divya Gehlot
Hi,
I would like to know difference between spark-shell and spark-submit in
terms of real time scenarios.

I am using Hadoop cluster with Spark on EC2.


Thanks,
Divya


which master option to view current running job in Spark UI

2016-02-14 Thread Divya Gehlot
Hi,
I have Hortonworks 2.3.4 cluster on EC2 and Have spark jobs as scala files
.
I am bit confused between using *master  *options
I want to execute this spark job in YARN

Curently running as
spark-shell --properties-file  /TestDivya/Spark/Oracle.properties --jars
/usr/hdp/2.3.4.0-3485/spark/lib/ojdbc6.jar --driver-class-path
/usr/hdp/2.3.4.0-3485/spark/lib/ojdbc6.jar --packages
com.databricks:spark-csv_2.10:1.1.0  *--master yarn-client *  -i
/TestDivya/Spark/Test.scala

with this option I cant see the currently running jobs in Spark WEB UI
though it later appear in spark history server.

My question with which --master option should I run my spark jobs so that I
can view the currently running jobs in spark web UI .

Thanks,
Divya


Need help :Does anybody has HDP cluster on EC2?

2016-02-15 Thread Divya Gehlot
Hi,
I have hadoop cluster set up in EC2.
I am unable to view application logs in Web UI as its taking internal IP
Like below :
http://ip-xxx-xx-xx-xxx.ap-southeast-1.compute.internal:8042


How can I change this to external one or redirecting to external ?
Attached screenshots for better understanding of my issue.

Would really appreciate help.


Thanks,
Divya

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Re: Need help :Does anybody has HDP cluster on EC2?

2016-02-15 Thread Divya Gehlot
Hi Sabarish,
Thanks alot for your help.
I am able to view the logs now

Thank you very much .

Cheers,
Divya


On 15 February 2016 at 16:51, Sabarish Sasidharan <
sabarish.sasidha...@manthan.com> wrote:

> You can setup SSH tunneling.
>
>
> http://docs.aws.amazon.com/ElasticMapReduce/latest/DeveloperGuide/emr-ssh-tunnel.html
>
> Regards
> Sab
>
> On Mon, Feb 15, 2016 at 1:55 PM, Divya Gehlot <divya.htco...@gmail.com>
> wrote:
>
>> Hi,
>> I have hadoop cluster set up in EC2.
>> I am unable to view application logs in Web UI as its taking internal IP
>> Like below :
>> http://ip-xxx-xx-xx-xxx.ap-southeast-1.compute.internal:8042
>> <http://ip-172-31-22-136.ap-southeast-1.compute.internal:8042/>
>>
>> How can I change this to external one or redirecting to external ?
>> Attached screenshots for better understanding of my issue.
>>
>> Would really appreciate help.
>>
>>
>> Thanks,
>> Divya
>>
>>
>>
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>
>
>
> --
>
> Architect - Big Data
> Ph: +91 99805 99458
>
> Manthan Systems | *Company of the year - Analytics (2014 Frost and
> Sullivan India ICT)*
> +++
>


which is better RDD or Dataframe?

2016-02-15 Thread Divya Gehlot
Hi,
I would like to know which gives better performance RDDs or dataframes ?
Like for one scenario :
1.Read the file as RDD and register as temp table and fire SQL query

 2.Read the file through Dataframe API or convert the RDD to dataframe and
use dataframe APIs to process the data.

For the scenario like above which gives better performance.
Does any body have benchmark or statistical data regarding that ?


Thanks,
Divya


SparkOnHBase : Which version of Spark its available

2016-02-17 Thread Divya Gehlot
Hi,

SparkonHBase is integrated with which version of Spark and HBase ?





Thanks,
Divya


Spark JDBC connection - data writing success or failure cases

2016-02-18 Thread Divya Gehlot
Hi,
I am a Spark job which connects to RDBMS (in mycase its Oracle).
How can we check that complete data writing is successful?
Can I use commit in case of success or rollback in case of failure ?



Thanks,
Divya


Re: Spark History Server NOT showing Jobs with Hortonworks

2016-02-18 Thread Divya Gehlot
Hi Sutanu ,

When you run your spark shell
you would  see below lines in your console

16/02/18 21:43:53 INFO AbstractConnector: Started
SelectChannelConnector@0.0.0.0:4041
16/02/18 21:43:53 INFO Utils: Successfully started service 'SparkUI' on
port 4041.
16/02/18 21:43:54 INFO SparkUI: Started SparkUI at http://xx.xx.xx.xxx:4041

As In my case instead of default port the UI started at 4041 port .

Hope this helps.

Thanks,
Divya



On 19 February 2016 at 07:09, Mich Talebzadeh  wrote:

> Is 4040 port used in your host? It should be default
>
>
>
> Example
>
>
>
> *netstat -plten|grep 4040*
>
>
>
> tcp0  0 :::4040
> :::*LISTEN  1009   42748209   *22778*/java
>
>
>
> *ps -ef|grep 22778*
>
>
>
> hduser   22778 22770  0 08:34 pts/100:01:18 /usr/java/latest/bin/java
> -cp
> /home/hduser/jars/jconn4.jar:/home/hduser/jars/ojdbc6.jar:/usr/lib/spark-1.5.2-bin-hadoop2.6/conf/:/usr/lib/spark-1.5.2-bin-hadoop2.6/lib/spark-assembly-1.5.2-hadoop2.6.0.jar:/usr/lib/spark-1.5.2-bin-hadoop2.6/lib/datanucleus-core-3.2.10.jar:/usr/lib/spark-1.5.2-bin-hadoop2.6/lib/datanucleus-api-jdo-3.2.6.jar:/usr/lib/spark-1.5.2-bin-hadoop2.6/lib/datanucleus-rdbms-3.2.9.jar:/home/hduser/hadoop-2.6.0/etc/hadoop/
> -Dscala.usejavacp=true -Xms1G -Xmx1G -XX:MaxPermSize=256m
> org.apache.spark.deploy.SparkSubmit --master spark://50.140.197.217:7077
> --class org.apache.spark.repl.Main --name Spark shell spark-shell
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> NOTE: The information in this email is proprietary and confidential. This
> message is for the designated recipient only, if you are not the intended
> recipient, you should destroy it immediately. Any information in this
> message shall not be understood as given or endorsed by Peridale Technology
> Ltd, its subsidiaries or their employees, unless expressly so stated. It is
> the responsibility of the recipient to ensure that this email is virus
> free, therefore neither Peridale Technology Ltd, its subsidiaries nor their
> employees accept any responsibility.
>
>
>
>
>
> *From:* Sutanu Das [mailto:sd2...@att.com]
> *Sent:* 18 February 2016 22:58
> *To:* Mich Talebzadeh ; user@spark.apache.org
>
> *Subject:* RE: Spark History Server NOT showing Jobs with Hortonworks
>
>
>
> Hi Mich, Community - Do I need to specify it in the properties file in my
> spark-submit ?
>
>
>
> *From:* Mich Talebzadeh [mailto:m...@peridale.co.uk ]
>
> *Sent:* Thursday, February 18, 2016 4:28 PM
> *To:* Sutanu Das; user@spark.apache.org
> *Subject:* RE: Spark History Server NOT showing Jobs with Hortonworks
>
>
>
> The jobs are normally shown under :4040/jobs/ in a normal set up
> not using any vendor’s flavoiur
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> NOTE: The information in this email is proprietary and confidential. This
> message is for the designated recipient only, if you are not the intended
> recipient, you should destroy it immediately. Any information in this
> message shall not be understood as given or endorsed by Peridale Technology
> Ltd, its subsidiaries or their employees, unless expressly so stated. It is
> the responsibility of the recipient to ensure that this email is virus
> free, therefore neither Peridale Technology Ltd, its subsidiaries nor their
> employees accept any responsibility.
>
>
>
>
>
> *From:* Sutanu Das [mailto:sd2...@att.com ]
> *Sent:* 18 February 2016 22:22
> *To:* user@spark.apache.org
> *Subject:* Spark History Server NOT showing Jobs with Hortonworks
>
>
>
> Hi Community,
>
>
>
> Challenged with Spark issues with *Hortonworks*  (HDP 2.3.2_Spark 1.4.1)
> – The Spark History Server is NOT showing the Spark Running Jobs in Local
> Mode
>
>
>
> The local-host:4040/app/v1 is ALSO not working
>
>
>
> How can I look at my local Spark job?
>
>
>
>
>
> # Generated by Apache Ambari. Fri Feb  5 00:37:06 2016
>
>
>
> spark.history.kerberos.keytab none
>
> spark.history.kerberos.principal none
>
> spark.history.provider
> org.apache.spark.deploy.yarn.history.YarnHistoryProvider
>
> spark.history.ui.port 18080
>
> spark.yarn.containerLauncherMaxThreads 25
>
> spark.yarn.driver.memoryOverhead 2048
>
> spark.yarn.executor.memoryOverhead 2048
>
> spark.yarn.historyServer.address has-dal-0001.corp.wayport.net:18080
>
> spark.yarn.max.executor.failures 3
>
> spark.yarn.preserve.staging.files false
>
> spark.yarn.queue default
>
> spark.yarn.scheduler.heartbeat.interval-ms 5000
>
> spark.yarn.services 

Read files dynamically having different schema under one parent directory + scala + Spakr 1.5,2

2016-02-19 Thread Divya Gehlot
Hi,
I have a use case ,where I have one parent directory

File stucture looks like
hdfs:///TestDirectory/spark1/part files( created by some spark job )
hdfs:///TestDirectory/spark2/ part files (created by some spark job )

spark1 and spark 2 has different schema

like spark 1  part files schema
carname model year

Spark2 part files schema
carowner city  carcost


As these spark 1 and spark2 directory gets created dynamically
can have spark3 directory with different schema

M requirement is to read the parent directory and list sub drectory
and create dataframe for each subdirectory

I am not able to get how can I list subdirectory under parent directory and
dynamically create dataframes.

Thanks,
Divya


Re: Read files dynamically having different schema under one parent directory + scala + Spakr 1.5,2

2016-02-20 Thread Divya Gehlot
Hi,
@Umesh :You understanding is partially correct as per my requirement.
My idea which I try to implement is
Steps which I am trying to follow
(Not sure how feasible it is I am new new bee to spark and scala)
1.List all the files under parent directory
  hdfs :///Testdirectory/
As list
For example : val listsubdirs =(subdir1,subdir2...subdir.n)
Iterate through this list
for(subdir <-listsubdirs){
val df ="df"+subdir
df= read it using spark csv package using custom schema

}
Will get dataframes equal to subdirs

Now I got stuck in first step itself .
How do I list directories and put it in list ?

Hope you understood my issue now.
Thanks,
Divya
On Feb 19, 2016 6:54 PM, "UMESH CHAUDHARY" <umesh9...@gmail.com> wrote:

> If I understood correctly, you can have many sub-dirs under 
> *hdfs:///TestDirectory
> *and and you need to attach a schema to all part files in a sub-dir.
>
> 1) I am assuming that you know the sub-dirs names :
>
> For that, you need to list all sub-dirs inside *hdfs:///TestDirectory
> *using Scala, iterate over sub-dirs
> foreach sub-dir in the list
> read the partfiles , identify and attach schema respective to that
> sub-directory.
>
> 2) If you don't know the sub-directory names:
> You need to store schema somewhere inside that sub-directory and read
> it in iteration.
>
> On Fri, Feb 19, 2016 at 3:44 PM, Divya Gehlot <divya.htco...@gmail.com>
> wrote:
>
>> Hi,
>> I have a use case ,where I have one parent directory
>>
>> File stucture looks like
>> hdfs:///TestDirectory/spark1/part files( created by some spark job )
>> hdfs:///TestDirectory/spark2/ part files (created by some spark job )
>>
>> spark1 and spark 2 has different schema
>>
>> like spark 1  part files schema
>> carname model year
>>
>> Spark2 part files schema
>> carowner city  carcost
>>
>>
>> As these spark 1 and spark2 directory gets created dynamically
>> can have spark3 directory with different schema
>>
>> M requirement is to read the parent directory and list sub drectory
>> and create dataframe for each subdirectory
>>
>> I am not able to get how can I list subdirectory under parent directory
>> and dynamically create dataframes.
>>
>> Thanks,
>> Divya
>>
>>
>>
>>
>>
>


[Example] : read custom schema from file

2016-02-21 Thread Divya Gehlot
Hi,
Can anybody help me by providing  me example how can we read schema of the
data set from the file.



Thanks,
Divya


Error :Type mismatch error when passing hdfs file path to spark-csv load method

2016-02-21 Thread Divya Gehlot
Hi,
I am trying to dynamically create Dataframe by reading subdirectories under
parent directory

My code looks like

> import org.apache.spark._
> import org.apache.spark.sql._
> val hadoopConf = new org.apache.hadoop.conf.Configuration()
> val hdfsConn = org.apache.hadoop.fs.FileSystem.get(new
> java.net.URI("hdfs://xxx.xx.xx.xxx:8020"), hadoopConf)
> hdfsConn.listStatus(new
> org.apache.hadoop.fs.Path("/TestDivya/Spark/ParentDir/")).foreach{
> fileStatus =>
>val filePathName = fileStatus.getPath().toString()
>val fileName = fileStatus.getPath().getName().toLowerCase()
>var df =  "df"+fileName
>df =
> sqlContext.read.format("com.databricks.spark.csv").option("header",
> "true").option("inferSchema", "true").load(filePathName)
> }


getting below error

> :35: error: type mismatch;
>  found   : org.apache.spark.sql.DataFrame
>  required: String
>  df =
> sqlContext.read.format("com.databricks.spark.csv").option("header",
> "true").option("inferSchema", "true").load(filePathName)


Am I missing something ?

Would really appreciate the help .


Thanks,
Divya


[ERROR]: Spark 1.5.2 + Hbase 1.1 + Hive 1.2 + HbaseIntegration

2016-02-29 Thread Divya Gehlot
Hi,
I am getting error when I am trying to connect hive table (which is being
created through HbaseIntegration) in spark

Steps I followed :
*Hive Table creation code  *:
CREATE EXTERNAL TABLE IF NOT EXISTS TEST(NAME STRING,AGE INT)
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,0:AGE")
TBLPROPERTIES ("hbase.table.name" = "TEST",
"hbase.mapred.output.outputtable" = "TEST");


*DESCRIBE TEST ;*
col_namedata_typecomment
namestring from deserializer
age   int from deserializer


*Spark Code :*
import org.apache.spark._
import org.apache.spark.sql._

val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
hiveContext.sql("from TEST SELECT  NAME").collect.foreach(println)


*Starting Spark shell*
spark-shell --jars
/usr/hdp/2.3.4.0-3485/hive/lib/guava-14.0.1.jar,/usr/hdp/2.3.4.0-3485/hive/lib/hive-hbase-handler.jar,/usr/hdp/2.3.4.0-3485/hbase/lib/hbase-client.jar,/usr/hdp/2.3.4.0-3485/hbase/lib/hbase-common.jar,/usr/hdp/2.3.4.0-3485/hbase/lib/hbase-protocol.jar,/usr/hdp/2.3.4.0-3485/hbase/lib/hbase-server.jar,/usr/hdp/2.3.4.0-3485/hive/lib/hive-hbase-handler.jar,/usr/hdp/2.3.4.0-3485/hive/lib/htrace-core-3.1.0-incubating.jar,/usr/hdp/2.3.4.0-3485/hive/lib/zookeeper-3.4.6.2.3.4.0-3485.jar,/usr/hdp/2.3.4.0-3485/hbase/lib/hbase-server.jar
--driver-class-path
/usr/hdp/2.3.4.0-3485/hive/lib/guava-14.0.1.jar,/usr/hdp/2.3.4.0-3485/hbase/lib/hbase-client.jar,/usr/hdp/2.3.4.0-3485/hbase/lib/hbase-common.jar,/usr/hdp/2.3.4.0-3485/hbase/lib/hbase-protocol.jar,/usr/hdp/2.3.4.0-3485/hbase/lib/hbase-server.jar,/usr/hdp/2.3.4.0-3485/hive/lib/hive-hbase-handler.jar,/usr/hdp/2.3.4.0-3485/hive/lib/htrace-core-3.1.0-incubating.jar,/usr/hdp/2.3.4.0-3485/hive/lib/zookeeper-3.4.6.2.3.4.0-3485.jar,/usr/hdp/2.3.4.0-3485/hive/lib/hive-hbase-handler.jar,/usr/hdp/2.3.4.0-3485/hbase/lib/hbase-server.jar
--packages com.databricks:spark-csv_2.10:1.3.0  --master yarn-client -i
/TestDivya/Spark/InstrumentCopyToHDFSHive.scala

*Stack Trace* :

Stack SQL context available as sqlContext.
> Loading /TestDivya/Spark/InstrumentCopyToHDFSHive.scala...
> import org.apache.spark._
> import org.apache.spark.sql._
> 16/02/29 23:09:29 INFO HiveContext: Initializing execution hive, version
> 1.2.1
> 16/02/29 23:09:29 INFO ClientWrapper: Inspected Hadoop version:
> 2.7.1.2.3.4.0-3485
> 16/02/29 23:09:29 INFO ClientWrapper: Loaded
> org.apache.hadoop.hive.shims.Hadoop23Shims for Hadoop version
> 2.7.1.2.3.4.0-3485
> 16/02/29 23:09:29 INFO HiveContext: default warehouse location is
> /user/hive/warehouse
> 16/02/29 23:09:29 INFO HiveContext: Initializing HiveMetastoreConnection
> version 1.2.1 using Spark classes.
> 16/02/29 23:09:29 INFO ClientWrapper: Inspected Hadoop version:
> 2.7.1.2.3.4.0-3485
> 16/02/29 23:09:29 INFO ClientWrapper: Loaded
> org.apache.hadoop.hive.shims.Hadoop23Shims for Hadoop version
> 2.7.1.2.3.4.0-3485
> 16/02/29 23:09:30 WARN NativeCodeLoader: Unable to load native-hadoop
> library for your platform... using builtin-java classes where applicable
> 16/02/29 23:09:30 INFO metastore: Trying to connect to metastore with URI
> thrift://ip-xxx-xx-xx-xxx.ap-southeast-1.compute.internal:9083
> 16/02/29 23:09:30 INFO metastore: Connected to metastore.
> 16/02/29 23:09:30 WARN DomainSocketFactory: The short-circuit local reads
> feature cannot be used because libhadoop cannot be loaded.
> 16/02/29 23:09:31 INFO SessionState: Created local directory:
> /tmp/1bf53785-f7c8-406d-a733-a5858ccb2d16_resources
> 16/02/29 23:09:31 INFO SessionState: Created HDFS directory:
> /tmp/hive/hdfs/1bf53785-f7c8-406d-a733-a5858ccb2d16
> 16/02/29 23:09:31 INFO SessionState: Created local directory:
> /tmp/hdfs/1bf53785-f7c8-406d-a733-a5858ccb2d16
> 16/02/29 23:09:31 INFO SessionState: Created HDFS directory:
> /tmp/hive/hdfs/1bf53785-f7c8-406d-a733-a5858ccb2d16/_tmp_space.db
> hiveContext: org.apache.spark.sql.hive.HiveContext =
> org.apache.spark.sql.hive.HiveContext@10b14f32
> 16/02/29 23:09:32 INFO ParseDriver: Parsing command: from TEST SELECT  NAME
> 16/02/29 23:09:32 INFO ParseDriver: Parse Completed
> 16/02/29 23:09:33 INFO deprecation: mapred.map.tasks is deprecated.
> Instead, use mapreduce.job.maps
> 16/02/29 23:09:33 INFO MemoryStore: ensureFreeSpace(468352) called with
> curMem=0, maxMem=556038881
> 16/02/29 23:09:33 INFO MemoryStore: Block broadcast_0 stored as values in
> memory (estimated size 457.4 KB, free 529.8 MB)
> 16/02/29 23:09:33 INFO MemoryStore: ensureFreeSpace(49454) called with
> curMem=468352, maxMem=556038881
> 16/02/29 23:09:33 INFO MemoryStore: Block broadcast_0_piece0 stored as
> bytes in memory (estimated size 48.3 KB, free 529.8 MB)
> 16/02/29 23:09:33 INFO BlockManagerInfo: Added broadcast_0_piece0 in
> memory on xxx.xx.xx.xxx:37784 (size: 48.3 KB, free: 530.2 MB)
> 16/02/29 23:09:33 INFO SparkContext: Created broadcast 0 from collect at
> :30
> 16/02/29 23:09:34 INFO HBaseStorageHandler: 

Spark with Yarn Client

2016-03-11 Thread Divya Gehlot
Hi,
I am trying to understand behaviour /configuration of spark with yarn
client on hadoop cluster .
Can somebody help me or point me document /blog/books which has deeper
understanding of above two.
Thanks,
Divya


append rows to dataframe

2016-03-13 Thread Divya Gehlot
Hi,

Please bear me for asking such a naive question
I have list of conditions (dynamic sqls) sitting in hbase table .
I need to iterate through those dynamic sqls and add the data to dataframes.
As we know dataframes are immutable ,when I try to iterate in for loop as
shown below I get only last dynamic sql result set .

var dffiltered : DataFrame = sqlContext.emptyDataFrame
 for ( i <- 0 to (dfFilterSQLs.length - 1)) {
 println("Condition="+dfFilterSQLs(i))
 dffiltered =
dfresult.filter(dfFilterSQLs(i)).select("Col1","Col2","Col3","Col4","Col5")
  dffiltered.show
  }


How can I keep on appending data to dataframe and get the final result
having all the sql conditions.

Thanks in advance for the help.

Thanks,
Divya


[Error] : dynamically union All + adding new column

2016-03-19 Thread Divya Gehlot
Hi,
I am dynamically doing union all and adding new column too

val dfresult =
> dfAcStamp.select("Col1","Col1","Col3","Col4","Col5","Col6","col7","col8","col9")
> val schemaL = dfresult.schema
> var dffiltered = sqlContext.createDataFrame(sc.emptyRDD[Row], schemaL)
> for ((key,values) <- lcrMap) {
> if(values(4) != null){
>  println("Condition="+values(4))
>  val renameRepId = values(0)+"REP_ID"
>  dffiltered.printSchema
> dfresult.printSchema
>  dffiltered =
> dffiltered.unionAll(dfresult.withColumn(renameRepId,lit(values(3))).drop("Col9").select("Col1","Col1","Col3","Col4","Col5","Col6","Col7","Col8","Col9").where(values(4))).distinct()


> }
> }



when I am printing the schema
dfresult
root
 |-- Col1: date (nullable = true)
 |-- Col2: date (nullable = true)
 |-- Col3: string (nullable = false)
 |-- Col4: string (nullable = false)
 |-- Col5: string (nullable = false)
 |-- Col6: string (nullable = true)
 |-- Col7: string (nullable = true)
 |-- Col8: string (nullable = true)
 |-- Col9: null (nullable = true)


dffiltered Schema
root
 |-- Col1: date (nullable = true)
 |-- Col2: date (nullable = true)
 |-- Col3: string (nullable = false)
 |-- Col4: string (nullable = false)
 |-- Col5: string (nullable = false)
 |-- Col6: string (nullable = true)
 |-- Col7: string (nullable = true)
 |-- Col8: string (nullable = true)
 |-- Col9: null (nullable = true)


As It is priting the same schema but when I am doing UnionAll its giving me
below error
org.apache.spark.sql.AnalysisException: Union can only be performed on
tables with the same number of columns, but the left table has 9 columns
and the right has 8;

Could somebody help me in pointing out my mistake  .


Thanks,


[Spark-1.5.2]Column renaming with withColumnRenamed has no effect

2016-03-19 Thread Divya Gehlot
Hi,
I am adding a new column and renaming it at same time but the renaming
doesnt have any effect.

dffiltered =
> dffiltered.unionAll(dfresult.withColumn("Col1",lit("value1").withColumn("Col2",lit("value2")).cast("int")).withColumn("Col3",lit("values3")).withColumnRenamed("Col1","Col1Rename").drop("Col1")


Can anybody help me pointing out my mistake ?

Thanks,
Divya


convert row to map of key as int and values as arrays

2016-03-15 Thread Divya Gehlot
Hi,
As I cant add colmns from another Dataframe
I am planning to  my row coulmns to map of key and arrays
As I am new to scala and spark
I am trying like below

// create an empty map
import scala.collection.mutable.{ArrayBuffer => mArrayBuffer}
var map = Map[Int,mArrayBuffer[Any]]()


def addNode(key: String, value:ArrayBuffer[Any] ) ={
nodes += (key -> (value :: (nodes get key getOrElse Nil)))
 }

  var rows = dfLnItmMappng.collect()
rows.foreach(r =>  addNode(r.getInt(2),
(r.getString(1),r.getString(3),r.getString(4),r.getString(5
for ((k,v) <- rows)
printf("key: %s, value: %s\n", k, v)

But I am getting below error :
import scala.collection.mutable.{ArrayBuffer=>mArrayBuffer}
map:
scala.collection.immutable.Map[Int,scala.collection.mutable.ArrayBuffer[Any]]
= Map()
:28: error: not found: value nodes
nodes += (key -> (value :: (nodes get key getOrElse Nil)))
^
:27: error: not found: type ArrayBuffer
   def addNode(key: String, value:ArrayBuffer[Any] ) ={



If anybody knows  better method to add coulmns from another
dataframe,please help by letting me know .


Thanks,
Divya


[How To :]Custom Logging of Spark Scala scripts

2016-03-14 Thread Divya Gehlot
Hi,
Can somebody point how can I confgure custom logs for my Spark (scala
scripts)
So that I can at which level my script failed and why ?


Thanks,
Divya


[ASK]:Dataframe number of column limit in Saprk 1.5.2

2016-04-12 Thread Divya Gehlot
Hi,
I would like to know does Spark Dataframe API has limit  on creation of
number of columns?

Thanks,
Divya


[HELP:]Save Spark Dataframe in Phoenix Table

2016-04-07 Thread Divya Gehlot
Hi,
I hava a Hortonworks Hadoop cluster having below Configurations :
Spark 1.5.2
HBASE 1.1.x
Phoenix 4.4

I am able to connect to Phoenix through JDBC connection and able to read
the Phoenix tables .
But while writing the data back to Phoenix table
I am getting below error :

org.apache.spark.sql.AnalysisException:
org.apache.phoenix.spark.DefaultSource does not allow user-specified
schemas.;

Can any body help in resolving the above errors or any other solution of
saving Spark Dataframes to Phoenix.

Would really appareciate the help.

Thanks,
Divya


Re: declare constant as date

2016-03-21 Thread Divya Gehlot
Oh my my I am so silly

I can declare it as string and cast it to date

My apologies for Spamming the mailing list.

Thanks,
Divya

On 21 March 2016 at 14:51, Divya Gehlot <divya.htco...@gmail.com> wrote:

> Hi,
> In Spark 1.5.2
> Do we have any utiility which converts a constant value as shown below
> orcan we declare a date variable like val start_date :Date = "2015-03-02"
>
> val start_date = "2015-03-02" toDate
> like how we convert to toInt ,toString
> I searched for it but  couldnt find it
>
>
> Thanks,
> Divya
>


Get the number of days dynamically in with Column

2016-03-20 Thread Divya Gehlot
I have a time stamping table which has data like
No of Days ID
11D
22D



and so on till 30 days

Have another Dataframe with
start date and end date
I need to get the difference between these two days and get the ID from
Time Stamping table and do With Column .

The tedious solution is


val dfTimeStamping = df.withColumn("ID",when(Diff between Start date and
Enddate ,"1D").when(Diff between Start date and Enddate ,"2D")).. have to
do till 30 days .

How can I do it dynamically ?


Thanks,
Divya


declare constant as date

2016-03-21 Thread Divya Gehlot
Hi,
In Spark 1.5.2
Do we have any utiility which converts a constant value as shown below
orcan we declare a date variable like val start_date :Date = "2015-03-02"

val start_date = "2015-03-02" toDate
like how we convert to toInt ,toString
I searched for it but  couldnt find it


Thanks,
Divya


[Spark -1.5.2]Dynamically creation of caseWhen expression

2016-03-23 Thread Divya Gehlot
Hi,
I have a map collection .
I am trying to build when condition based on the key values .
Like df.withColumn("ID", when( condition with map keys ,values of map )

How can I do that dynamically.
Currently I am iterating over keysIterator and get the values
Kal keys = myMap.keysIterator.toArray
Like below
df.withColumn("ID",when(condition on keys(0),lit(myMap get
keys(0)).when(condition on keys(1),lit(myMap get keys(1)).
when(condition on keys(2),lit(myMap get keys(3)).otherwise("value not
found"))

How can I build the above expression dynamically
Like for (key <-keys){
when(condition on key ,lit(myMap get key)
}
Would really appreciate the help.

Thanks,
Divya


find the matching and get the value

2016-03-22 Thread Divya Gehlot
Hi,
I am using Spark1.5.2
My requirement is as below

df.withColumn("NoOfDays",lit(datediff(df("Start_date"),df("end_date"


Now have to add one more columnn where my datediff(Start_date,end_date))
should match with map keys

Map looks like MyMap(1->1D,2->2D,3->3M,4->4W)

I want to do something like this

> val
> condition= MyMap.contains(lit(datediff(df("END_DATE"),df("START_DATE"
> val geId =MyMap(datediff(df("END_DATE"),df("START_DATE")))
> df.withColumn("AddColumn",when(cond,lit(getId)))


Is it possible ?

What I am missing here ..
I am beginner in scala and Spark.

Would really appreciate the help.

Thanks,
Divya


Spark 1.5.2 -Better way to create custom schema

2016-03-04 Thread Divya Gehlot
Hi ,
I have a data set in HDFS .
Is there any better any to define the custom schema for the data set having
more 100+ fields of different data types.

Thanks,
Divya


[Issue:]Getting null values for Numeric types while accessing hive tables (Registered on Hbase,created through Phoenix)

2016-03-03 Thread Divya Gehlot
Hi,
I am registering hive table on Hbase

CREATE EXTERNAL TABLE IF NOT EXISTS TEST(NAME STRING,AGE INT)
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,0:AGE")
TBLPROPERTIES ("hbase.table.name" = "TEST",
"hbase.mapred.output.outputtable" = "TEST");

When I am trying to access data I am getting null for age as its a numeric
field.

test.name test.age
John  null
Paul  null
Peter null


Version I am using
Phoenix  4.4
 Hbase 1.1.2
 Hive 1.2  ?
Has any body face this issue ?


Would really appreciate the help.


Thanks,
Divya


Spark 1.5.2 - Read custom schema from file

2016-03-03 Thread Divya Gehlot
Hi,
I have defined a custom schema as shown below :

val customSchema = StructType(
> StructField("year", IntegerType, true),
> StructField("make", StringType, true),
> StructField("model", StringType, true),
> StructField("comment", StringType, true),

StructField("blank", StringType, true))


Is there any way instead of defining it spark job file I can read from file.
I am using Spark-csv to read my data file

 val df = sqlContext.read
.format("com.databricks.spark.csv")
.option("header", "true") // Use first line of all files as header
.schema(customSchema)
.load("cars.csv")val selectedData = df.select("year", "model")
selectedData.write
.format("com.databricks.spark.csv")
.option("header", "true")
.save("newcars.csv")


Steps to Run Spark Scala job from Oozie on EC2 Hadoop clsuter

2016-03-07 Thread Divya Gehlot
Hi,

Could somebody help me by providing the steps /redirect me  to
blog/documentation on how to run Spark job written in scala through Oozie.

Would really appreciate the help.



Thanks,
Divya


[Spark 1.5.2]: Iterate through Dataframe columns and put it in map

2016-03-02 Thread Divya Gehlot
Hi,

I need to iterate through columns in dataframe based on certain condition
and put it in map .

Dataset
Column1  Column2
Car   Model1
Bike   Model2
Car Model2
Bike   Model 2

I want to iterate through above dataframe and put it in map where car is
key and model1 and model 2 as values


Thanks,
Regards,
Divya


[Error]Run Spark job as hdfs user from oozie workflow

2016-03-09 Thread Divya Gehlot
Hi,
I have non secure  Hadoop 2.7.2 cluster on EC2 having Spark 1.5.2
When I am submitting my spark scala script through shell script using Oozie
workflow.
I am submitting job as hdfs user but It is running as user = "yarn" so all
the output should get store under user/yarn directory only .

When I googled and got YARN-2424
 for non secure cluster
I changed the settings as per this docs

and when I ran my Oozie workflow as hdfs user  got below error

Application application_1457494230162_0004 failed 2 times due to AM
Container for appattempt_1457494230162_0004_02 exited with exitCode:
-1000
For more detailed output, check application tracking page:
http://ip-xxx-xx-xx-xxx.ap-southeast-1.compute.internal:8088/cluster/app/application_1457494230162_0004Then
,
click on links to logs of each attempt.
Diagnostics: Application application_1457494230162_0004 initialization
failed (exitCode=255) with output: main : command provided 0
main : run as user is hdfs
main : requested yarn user is hdfs
Can't create directory
/hadoop/yarn/local/usercache/hdfs/appcache/application_1457494230162_0004 -
Permission denied
Did not create any app directories
Failing this attempt. Failing the application.

After changing the settiing when I start spark shell
I got error saying that Error starting SQLContext -Yarn application has
ended

Has anybody ran into these kind of issues?
Would really appreciate if you could guide me to the steps/docs to resolve
it.


Thanks,
Divya


Re: [SQL] Two columns in output vs one when joining DataFrames?

2016-03-28 Thread Divya Gehlot
Hi Jacek ,

The difference is being mentioned in Spark doc itself

Note that if you perform a self-join using this function without aliasing
the input
* [[DataFrame]]s, you will NOT be able to reference any columns after the
join, since
* there is no way to disambiguate which side of the join you would like to
reference.
*

On 26 March 2016 at 04:19, Jacek Laskowski  wrote:

> Hi,
>
> I've read the note about both columns included when DataFrames are
> joined, but don't think it differentiated between versions of join. Is
> this a feature or a bug that the following session shows one _1 column
> with Seq("_1") and two columns for ===?
>
> {code}
> scala> left.join(right, Seq("_1")).show
> +---+---+---+
> | _1| _2| _2|
> +---+---+---+
> |  1|  a|  a|
> |  2|  b|  b|
> +---+---+---+
>
>
> scala> left.join(right, left("_1") === right("_1")).show
> +---+---+---+---+
> | _1| _2| _1| _2|
> +---+---+---+---+
> |  1|  a|  1|  a|
> |  2|  b|  2|  b|
> +---+---+---+---+
> {code}
>
> Pozdrawiam,
> Jacek Laskowski
> 
> https://medium.com/@jaceklaskowski/
> Mastering Apache Spark http://bit.ly/mastering-apache-spark
> Follow me at https://twitter.com/jaceklaskowski
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


[Spark-1.5.2]Spark Memory Issue while Saving to HDFS and Pheonix both

2016-04-01 Thread Divya Gehlot
Forgot to mention
I am using all DataFrame API instead of sqls to the operations

-- Forwarded message --
From: Divya Gehlot <divya.htco...@gmail.com>
Date: 1 April 2016 at 18:35
Subject: [Spark-1.5.2]Spark Memory Issue while Saving to HDFS and Pheonix
both
To: "user @spark" <user@spark.apache.org>


[image: Mic Drop]
Hi,
I have Hadoop Hortonworks  3 NODE  Cluster on EC2 with
*Hadoop *version 2.7.x
*Spark *version - 1.5.2
*Phoenix *version - 4.4
*Hbase *version 1.1.x

*Cluster Statistics *
Date Node 1
OS: redhat7 (x86_64)Cores (CPU): 2 (2)Disk: 20.69GB/99.99GB (20.69% used)
Memory: 7.39GB
Date Node 2
Cores (CPU): 2 (2)Disk: 20.73GB/99.99GB (20.73% used)Memory: 7.39GBLoad Avg:
 0.00Heartbeat: a moment agoCurrent Version: 2.3.4.0-3485*NameNode*Rack:
 /default-rack OS: redhat7 (x86_64)Cores (CPU): 4 (4)Disk: 32.4GB/99.99GB
(32.4% used)Memory: 15.26GBLoad Avg: 0.78Heartbeat: a moment agoCurrent
Version: 2.3.4.0-3485

*Spark Queue Statistics *

> Queue State: RUNNING
> Used Capacity: 0.0%
> Configured Capacity: 100.0%
> Configured Max Capacity: 100.0%
> Absolute Used Capacity: 0.0%
> Absolute Configured Capacity: 100.0%
> Absolute Configured Max Capacity: 100.0%
> Used Resources: <memory:0, vCores:0>
> Num Schedulable Applications: 0
> Num Non-Schedulable Applications: 0
> Num Containers: 0
> Max Applications: 1
> Max Applications Per User: 1
> Max Application Master Resources: <memory:3072, vCores:1>
> Used Application Master Resources: <memory:0, vCores:0>
> Max Application Master Resources Per User: <memory:3072, vCores:1>
> Configured Minimum User Limit Percent: 100%
> Configured User Limit Factor: 1.0
> Accessible Node Labels: *
> Ordering Policy: FifoOrderingPolicy
> Preemption: disabled



I have spark scala script
which is doing many operations like reading from
DB(Phoenix),Join-Inner,LeftOuter join),unionAll and finally groupBy and
saving the result set to Phoenix/HDFS
Have created almost 20+ Dataframes for mentioned above operations.

stackTrace :

> 16/04/01 10:11:49 WARN TaskSetManager: Lost task 3.0 in stage 132.4 (TID
> 18401, ip-xxx-xx-xx-xxx.ap-southeast-1.compute.internal):
> java.lang.OutOfMemoryError: PermGen space
> at sun.misc.Unsafe.defineClass(Native Method)
> at sun.reflect.ClassDefiner.defineClass(ClassDefiner.java:63)
> at
> sun.reflect.MethodAccessorGenerator$1.run(MethodAccessorGenerator.java:399)
> at
> sun.reflect.MethodAccessorGenerator$1.run(MethodAccessorGenerator.java:396)
> at java.security.AccessController.doPrivileged(Native Method)
> at
> sun.reflect.MethodAccessorGenerator.generate(MethodAccessorGenerator.java:395)
> at
> sun.reflect.MethodAccessorGenerator.generateSerializationConstructor(MethodAccessorGenerator.java:113)
> at
> sun.reflect.ReflectionFactory.newConstructorForSerialization(ReflectionFactory.java:331)
> at
> java.io.ObjectStreamClass.getSerializableConstructor(ObjectStreamClass.java:1376)
> at java.io.ObjectStreamClass.access$1500(ObjectStreamClass.java:72)
> at java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:493)
> at java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:468)
> at java.security.AccessController.doPrivileged(Native Method)
> at java.io.ObjectStreamClass.(ObjectStreamClass.java:468)
> at java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:365)
> at
> java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:602)
> at
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1622)
> at
> java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)




For Phoenix : I am getting similar to below error in my stack trace

>
> $provider.DefaultSource does not allow user-specified schemas


The whole job is taking almost 3-4 minutes and for saving itself its taking
3-4 minutes whether it is Phoenix /HDFS

Could somebody help me resolving the above mentioned issue.

Would really appreciate the help.


Thanks,

Divya


[Spark-1.5.2]Spark Memory Issue while Saving to HDFS and Pheonix both

2016-04-01 Thread Divya Gehlot
[image: Mic Drop]
Hi,
I have Hadoop Hortonworks  3 NODE  Cluster on EC2 with
*Hadoop *version 2.7.x
*Spark *version - 1.5.2
*Phoenix *version - 4.4
*Hbase *version 1.1.x

*Cluster Statistics *
Date Node 1
OS: redhat7 (x86_64)Cores (CPU): 2 (2)Disk: 20.69GB/99.99GB (20.69% used)
Memory: 7.39GB
Date Node 2
Cores (CPU): 2 (2)Disk: 20.73GB/99.99GB (20.73% used)Memory: 7.39GBLoad Avg:
 0.00Heartbeat: a moment agoCurrent Version: 2.3.4.0-3485*NameNode*Rack:
 /default-rack OS: redhat7 (x86_64)Cores (CPU): 4 (4)Disk: 32.4GB/99.99GB
(32.4% used)Memory: 15.26GBLoad Avg: 0.78Heartbeat: a moment agoCurrent
Version: 2.3.4.0-3485

*Spark Queue Statistics *

> Queue State: RUNNING
> Used Capacity: 0.0%
> Configured Capacity: 100.0%
> Configured Max Capacity: 100.0%
> Absolute Used Capacity: 0.0%
> Absolute Configured Capacity: 100.0%
> Absolute Configured Max Capacity: 100.0%
> Used Resources: 
> Num Schedulable Applications: 0
> Num Non-Schedulable Applications: 0
> Num Containers: 0
> Max Applications: 1
> Max Applications Per User: 1
> Max Application Master Resources: 
> Used Application Master Resources: 
> Max Application Master Resources Per User: 
> Configured Minimum User Limit Percent: 100%
> Configured User Limit Factor: 1.0
> Accessible Node Labels: *
> Ordering Policy: FifoOrderingPolicy
> Preemption: disabled



I have spark scala script
which is doing many operations like reading from
DB(Phoenix),Join-Inner,LeftOuter join),unionAll and finally groupBy and
saving the result set to Phoenix/HDFS
Have created almost 20+ Dataframes for mentioned above operations.

stackTrace :

> 16/04/01 10:11:49 WARN TaskSetManager: Lost task 3.0 in stage 132.4 (TID
> 18401, ip-xxx-xx-xx-xxx.ap-southeast-1.compute.internal):
> java.lang.OutOfMemoryError: PermGen space
> at sun.misc.Unsafe.defineClass(Native Method)
> at sun.reflect.ClassDefiner.defineClass(ClassDefiner.java:63)
> at
> sun.reflect.MethodAccessorGenerator$1.run(MethodAccessorGenerator.java:399)
> at
> sun.reflect.MethodAccessorGenerator$1.run(MethodAccessorGenerator.java:396)
> at java.security.AccessController.doPrivileged(Native Method)
> at
> sun.reflect.MethodAccessorGenerator.generate(MethodAccessorGenerator.java:395)
> at
> sun.reflect.MethodAccessorGenerator.generateSerializationConstructor(MethodAccessorGenerator.java:113)
> at
> sun.reflect.ReflectionFactory.newConstructorForSerialization(ReflectionFactory.java:331)
> at
> java.io.ObjectStreamClass.getSerializableConstructor(ObjectStreamClass.java:1376)
> at java.io.ObjectStreamClass.access$1500(ObjectStreamClass.java:72)
> at java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:493)
> at java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:468)
> at java.security.AccessController.doPrivileged(Native Method)
> at java.io.ObjectStreamClass.(ObjectStreamClass.java:468)
> at java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:365)
> at
> java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:602)
> at
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1622)
> at
> java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)




For Phoenix : I am getting similar to below error in my stack trace

>
> $provider.DefaultSource does not allow user-specified schemas


The whole job is taking almost 3-4 minutes and for saving itself its taking
3-4 minutes whether it is Phoenix /HDFS

Could somebody help me resolving the above mentioned issue.

Would really appreciate the help.


Thanks,

Divya


Change TimeZone Setting in Spark 1.5.2

2016-03-29 Thread Divya Gehlot
Hi,

The Spark set up is  on Hadoop cluster.
How can I set up the Spark timezone to sync with Server Timezone ?
Any idea?


Thanks,
Divya


Memory needs when using expensive operations like groupBy

2016-04-13 Thread Divya Gehlot
Hi,
I am using Spark 1.5.2 with Scala 2.10 and my Spark job keeps failing with
exit code 143 .
except one job where I am using unionAll and groupBy operation on multiple
columns .

Please advice me the options to optimize it .
The one option which I am using it now
--conf spark.executor.extraJavaOptions  -XX:MaxPermSize=1024m
-XX:PermSize=256m --conf spark.driver.extraJavaOptions
 -XX:MaxPermSize=1024m -XX:PermSize=256m --conf
spark.yarn.executor.memoryOverhead=1024

Need to know the best practices/better ways to optimize code.

Thanks,
Divya


Re: Spark DataFrame sum of multiple columns

2016-04-22 Thread Divya Gehlot
Easy way of doing it

newdf = df.withColumn('total', sum(df[col] for col in df.columns))


On 22 April 2016 at 11:51, Naveen Kumar Pokala 
wrote:

> Hi,
>
>
>
> Do we have any way to perform Row level operations in spark dataframes.
>
>
>
>
>
> For example,
>
>
>
> I have a dataframe with columns from A,B,C,…Z.. I want to add one more
> column New Column with sum of all column values.
>
>
>
> A
>
> B
>
> C
>
> D
>
> .
>
> .
>
> .
>
> Z
>
> New Column
>
> 1
>
> 2
>
> 4
>
> 3
>
>
>
>
>
>
>
> 26
>
> 351
>
>
>
>
>
> Can somebody help me on this?
>
>
>
>
>
> Thanks,
>
> Naveen
>


Re: [Spark 1.5.2]All data being written to only one part file rest part files are empty

2016-04-29 Thread Divya Gehlot
Hi ,

I observed if I use subset of same dataset  or data set is small  its
writing to many part files .
If data set grows its writing to only part files rest all part files empty.


Thanks,
Divya

On 25 April 2016 at 23:15, nguyen duc tuan <newvalu...@gmail.com> wrote:

> Maybe the problem is the data itself. For example, the first dataframe
> might has common keys in only one part of the second dataframe. I think you
> can verify if you are in this situation by repartition one dataframe and
> join it. If this is the true reason, you might see the result distributed
> more evenly.
>
> 2016-04-25 9:34 GMT+07:00 Divya Gehlot <divya.htco...@gmail.com>:
>
>> Hi,
>>
>> After joining two dataframes, saving dataframe using Spark CSV.
>> But all the result data is being written to only one part file whereas
>> there are 200 part files being created, rest 199 part files are empty.
>>
>> What is the cause of uneven partitioning ? How can I evenly distribute
>> the data ?
>> Would really appreciate the help.
>>
>>
>> Thanks,
>> Divya
>>
>
>


Re: Cant join same dataframe twice ?

2016-04-27 Thread Divya Gehlot
 when working with Dataframes and using explain to debug I observed that
Spark gives  different tagging number for the same dataframe columns
Like in this case
val df1 = df2.join(df3,"Column1")
Below throwing error missing columns
val df 4 = df1.join(df3,"Column2")

For instance,df2 has 2 columns ,df2 columns gets tagging like df2Col1#4
,df2Col2#5
   df3 has 4 columns ,df3 columns gets tagging like
df3Col1#6,df3Col2#7,df3Col3#8,df3Col4#9
Now after joining df1 columns tagging will be
df2Co1l#10,df2Col2#11,df3Col1#12,df3Col2#13,df3Col3#14,df3Col4#15

Now when df1 again with df3 the df3 columns tagging changed
 df2Co1l#16,df2Col2#17,df3Col1#18
,df3Col2#19,df3Col3#20,df3Col4#21,df3Col2#23,df3Col3#24,df3Col4#25

but joining df3Col1#12  would be referring to the previous dataframe and
that causes the issue .

Thanks,
Divya






On 27 April 2016 at 23:55, Ted Yu <yuzhih...@gmail.com> wrote:

> I wonder if Spark can provide better support for this case.
>
> The following schema is not user friendly (shown previsouly):
>
> StructField(b,IntegerType,false), StructField(b,IntegerType,false)
>
> Except for 'select *', there is no way for user to query any of the two
> fields.
>
> On Tue, Apr 26, 2016 at 10:17 PM, Takeshi Yamamuro <linguin@gmail.com>
> wrote:
>
>> Based on my example, how about renaming columns?
>>
>> val df1 = Seq((1, 1), (2, 2), (3, 3)).toDF("a", "b")
>> val df2 = Seq((1, 1), (2, 2), (3, 3)).toDF("a", "b")
>> val df3 = df1.join(df2, "a").select($"a", df1("b").as("1-b"),
>> df2("b").as("2-b"))
>> val df4 = df3.join(df2, df3("2-b") === df2("b"))
>>
>> // maropu
>>
>> On Wed, Apr 27, 2016 at 1:58 PM, Divya Gehlot <divya.htco...@gmail.com>
>> wrote:
>>
>>> Correct Takeshi
>>> Even I am facing the same issue .
>>>
>>> How to avoid the ambiguity ?
>>>
>>>
>>> On 27 April 2016 at 11:54, Takeshi Yamamuro <linguin@gmail.com>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> I tried;
>>>> val df1 = Seq((1, 1), (2, 2), (3, 3)).toDF("a", "b")
>>>> val df2 = Seq((1, 1), (2, 2), (3, 3)).toDF("a", "b")
>>>> val df3 = df1.join(df2, "a")
>>>> val df4 = df3.join(df2, "b")
>>>>
>>>> And I got; org.apache.spark.sql.AnalysisException: Reference 'b' is
>>>> ambiguous, could be: b#6, b#14.;
>>>> If same case, this message makes sense and this is clear.
>>>>
>>>> Thought?
>>>>
>>>> // maropu
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On Wed, Apr 27, 2016 at 6:09 AM, Prasad Ravilla <pras...@slalom.com>
>>>> wrote:
>>>>
>>>>> Also, check the column names of df1 ( after joining df2 and df3 ).
>>>>>
>>>>> Prasad.
>>>>>
>>>>> From: Ted Yu
>>>>> Date: Monday, April 25, 2016 at 8:35 PM
>>>>> To: Divya Gehlot
>>>>> Cc: "user @spark"
>>>>> Subject: Re: Cant join same dataframe twice ?
>>>>>
>>>>> Can you show us the structure of df2 and df3 ?
>>>>>
>>>>> Thanks
>>>>>
>>>>> On Mon, Apr 25, 2016 at 8:23 PM, Divya Gehlot <divya.htco...@gmail.com
>>>>> > wrote:
>>>>>
>>>>>> Hi,
>>>>>> I am using Spark 1.5.2 .
>>>>>> I have a use case where I need to join the same dataframe twice on
>>>>>> two different columns.
>>>>>> I am getting error missing Columns
>>>>>>
>>>>>> For instance ,
>>>>>> val df1 = df2.join(df3,"Column1")
>>>>>> Below throwing error missing columns
>>>>>> val df 4 = df1.join(df3,"Column2")
>>>>>>
>>>>>> Is the bug or valid scenario ?
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> Thanks,
>>>>>> Divya
>>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> ---
>>>> Takeshi Yamamuro
>>>>
>>>
>>>
>>
>>
>> --
>> ---
>> Takeshi Yamamuro
>>
>
>


getting ClassCastException when calling UDF

2016-04-27 Thread Divya Gehlot
Hi,
I am using Spark 1.5.2 and defined   below udf

import org.apache.spark.sql.functions.udf
> val myUdf  = (wgts : Int , amnt :Float) => {
> (wgts*amnt)/100.asInstanceOf[Float]
> }
>



val df2 = df1.withColumn("WEIGHTED_AMOUNT",callUDF(udfcalWghts,
FloatType,col("RATE"),col("AMOUNT")))

In my schema RATE is in integerType and Amount FLOATTYPE

I am getting below error for

> org.apache.spark.SparkException: Job aborted due to stage failure: Task
> 106 in stage 89.0 failed 4 times, most recent failure: Lost task 106.3 in
> stage 89.0 (TID 7735, ip-xx-xx-xx-xxx.ap-southeast-1.compute.internal):
> java.lang.ClassCastException: java.lang.Double cannot be cast to
> java.lang.Float
> at scala.runtime.BoxesRunTime.unboxToFloat(BoxesRunTime.java:114)
> at
> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(https://github.com/EclairJS/eclairjs-nashorn/issues/3


Can somebody help me with the resolution ?











Thanks,
Divya


Re: removing header from csv file

2016-04-26 Thread Divya Gehlot
yes you can remove the headers by removing the first row

can first() or head() to do that


Thanks,
Divya

On 27 April 2016 at 13:24, Ashutosh Kumar  wrote:

> I see there is a library spark-csv which can be used for removing header
> and processing of csv files. But it seems it works with sqlcontext only. Is
> there a way to remove header from csv files without sqlcontext ?
>
> Thanks
> Ashutosh
>


Re: Cant join same dataframe twice ?

2016-04-26 Thread Divya Gehlot
Correct Takeshi
Even I am facing the same issue .

How to avoid the ambiguity ?


On 27 April 2016 at 11:54, Takeshi Yamamuro <linguin@gmail.com> wrote:

> Hi,
>
> I tried;
> val df1 = Seq((1, 1), (2, 2), (3, 3)).toDF("a", "b")
> val df2 = Seq((1, 1), (2, 2), (3, 3)).toDF("a", "b")
> val df3 = df1.join(df2, "a")
> val df4 = df3.join(df2, "b")
>
> And I got; org.apache.spark.sql.AnalysisException: Reference 'b' is
> ambiguous, could be: b#6, b#14.;
> If same case, this message makes sense and this is clear.
>
> Thought?
>
> // maropu
>
>
>
>
>
>
>
> On Wed, Apr 27, 2016 at 6:09 AM, Prasad Ravilla <pras...@slalom.com>
> wrote:
>
>> Also, check the column names of df1 ( after joining df2 and df3 ).
>>
>> Prasad.
>>
>> From: Ted Yu
>> Date: Monday, April 25, 2016 at 8:35 PM
>> To: Divya Gehlot
>> Cc: "user @spark"
>> Subject: Re: Cant join same dataframe twice ?
>>
>> Can you show us the structure of df2 and df3 ?
>>
>> Thanks
>>
>> On Mon, Apr 25, 2016 at 8:23 PM, Divya Gehlot <divya.htco...@gmail.com>
>> wrote:
>>
>>> Hi,
>>> I am using Spark 1.5.2 .
>>> I have a use case where I need to join the same dataframe twice on two
>>> different columns.
>>> I am getting error missing Columns
>>>
>>> For instance ,
>>> val df1 = df2.join(df3,"Column1")
>>> Below throwing error missing columns
>>> val df 4 = df1.join(df3,"Column2")
>>>
>>> Is the bug or valid scenario ?
>>>
>>>
>>>
>>>
>>> Thanks,
>>> Divya
>>>
>>
>>
>
>
> --
> ---
> Takeshi Yamamuro
>


[Spark 1.5.2]All data being written to only one part file rest part files are empty

2016-04-24 Thread Divya Gehlot
Hi,

After joining two dataframes, saving dataframe using Spark CSV.
But all the result data is being written to only one part file whereas
there are 200 part files being created, rest 199 part files are empty.

What is the cause of uneven partitioning ? How can I evenly distribute the
data ?
Would really appreciate the help.


Thanks,
Divya


Cant join same dataframe twice ?

2016-04-25 Thread Divya Gehlot
Hi,
I am using Spark 1.5.2 .
I have a use case where I need to join the same dataframe twice on two
different columns.
I am getting error missing Columns

For instance ,
val df1 = df2.join(df3,"Column1")
Below throwing error missing columns
val df 4 = df1.join(df3,"Column2")

Is the bug or valid scenario ?




Thanks,
Divya


[Ask :]Best Practices - Application logging in Spark 1.5.2 + Scala 2.10

2016-04-21 Thread Divya Gehlot
Hi,
I am using Spark with Hadoop 2.7 cluster
I need to print all my print statement and or any errors to file for
instance some info if passed some level or some error if something misisng
in my Spark Scala Script.

Can some body help me or redirect me tutorial,blog, books .
Whats the best way to achieve it.

Thanks in advance.

Divya


Re: Error joining dataframes

2016-05-18 Thread Divya Gehlot
Can you try var df_join = df1.join(df2,df1( "Id") ===df2("Id"),
"fullouter").drop(df1("Id"))
On May 18, 2016 2:16 PM, "ram kumar"  wrote:

I tried

scala> var df_join = df1.join(df2, "Id", "fullouter")
:27: error: type mismatch;
 found   : String("Id")
 required: org.apache.spark.sql.Column
   var df_join = df1.join(df2, "Id", "fullouter")
   ^

scala>

And I cant see the above method in
https://spark.apache.org/docs/1.5.1/api/java/org/apache/spark/sql/DataFrame.html#join(org.apache.spark.sql.DataFrame,%20org.apache.spark.sql.Column,%20java.lang.String)

On Wed, May 18, 2016 at 2:22 AM, Bijay Kumar Pathak 
wrote:

> Hi,
>
> Try this one:
>
>
> df_join = df1.*join*(df2, 'Id', "fullouter")
>
> Thanks,
> Bijay
>
>
> On Tue, May 17, 2016 at 9:39 AM, ram kumar 
> wrote:
>
>> Hi,
>>
>> I tried to join two dataframe
>>
>> df_join = df1.*join*(df2, ((df1("Id") === df2("Id")), "fullouter")
>>
>> df_join.registerTempTable("join_test")
>>
>>
>> When querying "Id" from "join_test"
>>
>> 0: jdbc:hive2://> *select Id from join_test;*
>> *Error*: org.apache.spark.sql.AnalysisException: Reference 'Id' is
>> *ambiguous*, could be: Id#128, Id#155.; line 1 pos 7 (state=,code=0)
>> 0: jdbc:hive2://>
>>
>> Is there a way to merge the value of df1("Id") and df2("Id") into one "Id"
>>
>> Thanks
>>
>
>


[Spark 1.5.2]Check Foreign Key constraint

2016-05-11 Thread Divya Gehlot
Hi,
I am using Spark 1.5.2  with Apache Phoenix 4.4
As Spark 1.5.2 doesn't support subquery in where conditions .
https://issues.apache.org/jira/browse/SPARK-4226

Is there any alternative way to find foreign key constraints.
Would really appreciate the help.



Thanks,
Divya


[Spark 1.5.2] Spark dataframes vs sql query -performance parameter ?

2016-05-03 Thread Divya Gehlot
Hi,
I am interested to know on which parameters  we can say Spark data frames
are better  sql queries .
Would be grateful ,If somebody can explain me with the usecases .

Thanks,
Divya


Re: spark 1.6.1 build failure of : scala-maven-plugin

2016-05-03 Thread Divya Gehlot
Hi ,
Even I am getting the similar error
Failed to execute goal net.alchim31.maven:scala-maven-plugin:3.2.2:compile
When I tried to build Phoenix Project using maven .
Maven version : 3.3
Java version - 1.7_67
Phoenix - downloaded latest master from Git hub
If anybody find the the resolution please share.


Thanks,
Divya

On 3 May 2016 at 10:18, sunday2000 <2314476...@qq.com> wrote:

> [INFO]
> 
> [INFO] BUILD FAILURE
> [INFO]
> 
> [INFO] Total time: 14.765 s
> [INFO] Finished at: 2016-05-03T10:08:46+08:00
> [INFO] Final Memory: 35M/191M
> [INFO]
> 
> [ERROR] Failed to execute goal
> net.alchim31.maven:scala-maven-plugin:3.2.2:compile (scala-compile-first)
> on project spark-test-tags_2.10: Execution scala-compile-first of goal
> net.alchim31.maven:scala-maven-plugin:3.2.2:compile failed. CompileFailed
> -> [Help 1]
> org.apache.maven.lifecycle.LifecycleExecutionException: Failed to execute
> goal net.alchim31.maven:scala-maven-plugin:3.2.2:compile
> (scala-compile-first) on project spark-test-tags_2.10: Execution
> scala-compile-first of goal
> net.alchim31.maven:scala-maven-plugin:3.2.2:compile failed.
> at
> org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:224)
> at
> org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:153)
> at
> org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:145)
> at
> org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject(LifecycleModuleBuilder.java:116)
> at
> org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject(LifecycleModuleBuilder.java:80)
> at
> org.apache.maven.lifecycle.internal.builder.singlethreaded.SingleThreadedBuilder.build(SingleThreadedBuilder.java:51)
> at
> org.apache.maven.lifecycle.internal.LifecycleStarter.execute(LifecycleStarter.java:128)
> at org.apache.maven.DefaultMaven.doExecute(DefaultMaven.java:307)
> at org.apache.maven.DefaultMaven.doExecute(DefaultMaven.java:193)
> at org.apache.maven.DefaultMaven.execute(DefaultMaven.java:106)
> at org.apache.maven.cli.MavenCli.execute(MavenCli.java:862)
> at org.apache.maven.cli.MavenCli.doMain(MavenCli.java:286)
> at org.apache.maven.cli.MavenCli.main(MavenCli.java:197)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.codehaus.plexus.classworlds.launcher.Launcher.launchEnhanced(Launcher.java:289)
> at
> org.codehaus.plexus.classworlds.launcher.Launcher.launch(Launcher.java:229)
> at
> org.codehaus.plexus.classworlds.launcher.Launcher.mainWithExitCode(Launcher.java:415)
> at
> org.codehaus.plexus.classworlds.launcher.Launcher.main(Launcher.java:356)
> Caused by: org.apache.maven.plugin.PluginExecutionException: Execution
> scala-compile-first of goal
> net.alchim31.maven:scala-maven-plugin:3.2.2:compile failed.
> at
> org.apache.maven.plugin.DefaultBuildPluginManager.executeMojo(DefaultBuildPluginManager.java:145)
> at
> org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:208)
> ... 20 more
> Caused by: Compile failed via zinc server
> at
> sbt_inc.SbtIncrementalCompiler.zincCompile(SbtIncrementalCompiler.java:136)
> at
> sbt_inc.SbtIncrementalCompiler.compile(SbtIncrementalCompiler.java:86)
> at
> scala_maven.ScalaCompilerSupport.incrementalCompile(ScalaCompilerSupport.java:303)
> at
> scala_maven.ScalaCompilerSupport.compile(ScalaCompilerSupport.java:119)
> at
> scala_maven.ScalaCompilerSupport.doExecute(ScalaCompilerSupport.java:99)
> at scala_maven.ScalaMojoSupport.execute(ScalaMojoSupport.java:482)
> at
> org.apache.maven.plugin.DefaultBuildPluginManager.executeMojo(DefaultBuildPluginManager.java:134)
> ... 21 more
> [ERROR]
> [ERROR]
> [ERROR] For more information about the errors and possible solutions,
> please read the following articles:
> [ERROR] [Help 1]
> http://cwiki.apache.org/confluence/display/MAVEN/PluginExecutionException
> [ERROR]
> [ERROR] After correcting the problems, you can resume the build with the
> command
> [ERROR]   mvn  -rf :spark-test-tags_2.10


Re: spark 1.6.1 build failure of : scala-maven-plugin

2016-05-04 Thread Divya Gehlot
Hi,

My Javac version

C:\Users\Divya>javac -version
javac 1.7.0_79

C:\Users\Divya>java -version
java version "1.7.0_79"
Java(TM) SE Runtime Environment (build 1.7.0_79-b15)
Java HotSpot(TM) 64-Bit Server VM (build 24.79-b02, mixed mode)

Do I need use higher version ?


Thanks,
Divya

On 4 May 2016 at 21:31, sunday2000 <2314476...@qq.com> wrote:

> Check your javac version, and update it.
>
>
> -- 原始邮件 ------
> *发件人:* "Divya Gehlot";<divya.htco...@gmail.com>;
> *发送时间:* 2016年5月4日(星期三) 中午11:25
> *收件人:* "sunday2000"<2314476...@qq.com>;
> *抄送:* "user"<user@spark.apache.org>; "user"<u...@phoenix.apache.org>;
> *主题:* Re: spark 1.6.1 build failure of : scala-maven-plugin
>
> Hi ,
> Even I am getting the similar error
> Failed to execute goal net.alchim31.maven:scala-maven-plugin:3.2.2:compile
> When I tried to build Phoenix Project using maven .
> Maven version : 3.3
> Java version - 1.7_67
> Phoenix - downloaded latest master from Git hub
> If anybody find the the resolution please share.
>
>
> Thanks,
> Divya
>
> On 3 May 2016 at 10:18, sunday2000 <2314476...@qq.com> wrote:
>
>> [INFO]
>> 
>> [INFO] BUILD FAILURE
>> [INFO]
>> 
>> [INFO] Total time: 14.765 s
>> [INFO] Finished at: 2016-05-03T10:08:46+08:00
>> [INFO] Final Memory: 35M/191M
>> [INFO]
>> 
>> [ERROR] Failed to execute goal
>> net.alchim31.maven:scala-maven-plugin:3.2.2:compile (scala-compile-first)
>> on project spark-test-tags_2.10: Execution scala-compile-first of goal
>> net.alchim31.maven:scala-maven-plugin:3.2.2:compile failed. CompileFailed
>> -> [Help 1]
>> org.apache.maven.lifecycle.LifecycleExecutionException: Failed to execute
>> goal net.alchim31.maven:scala-maven-plugin:3.2.2:compile
>> (scala-compile-first) on project spark-test-tags_2.10: Execution
>> scala-compile-first of goal
>> net.alchim31.maven:scala-maven-plugin:3.2.2:compile failed.
>> at
>> org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:224)
>> at
>> org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:153)
>> at
>> org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:145)
>> at
>> org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject(LifecycleModuleBuilder.java:116)
>> at
>> org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject(LifecycleModuleBuilder.java:80)
>> at
>> org.apache.maven.lifecycle.internal.builder.singlethreaded.SingleThreadedBuilder.build(SingleThreadedBuilder.java:51)
>> at
>> org.apache.maven.lifecycle.internal.LifecycleStarter.execute(LifecycleStarter.java:128)
>> at org.apache.maven.DefaultMaven.doExecute(DefaultMaven.java:307)
>> at org.apache.maven.DefaultMaven.doExecute(DefaultMaven.java:193)
>> at org.apache.maven.DefaultMaven.execute(DefaultMaven.java:106)
>> at org.apache.maven.cli.MavenCli.execute(MavenCli.java:862)
>> at org.apache.maven.cli.MavenCli.doMain(MavenCli.java:286)
>> at org.apache.maven.cli.MavenCli.main(MavenCli.java:197)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:498)
>> at
>> org.codehaus.plexus.classworlds.launcher.Launcher.launchEnhanced(Launcher.java:289)
>> at
>> org.codehaus.plexus.classworlds.launcher.Launcher.launch(Launcher.java:229)
>> at
>> org.codehaus.plexus.classworlds.launcher.Launcher.mainWithExitCode(Launcher.java:415)
>> at
>> org.codehaus.plexus.classworlds.launcher.Launcher.main(Launcher.java:356)
>> Caused by: org.apache.maven.plugin.PluginExecutionException: Execution
>> scala-compile-first of goal
>> net.alchim31.maven:scala-maven-plugin:3.2.2:compile failed.
>> at
>> org.apache.maven.plugin.DefaultBuildPluginManager.executeMojo(DefaultBuildPluginManager.java:145)
>> at
>> org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:208)
>> ... 20 more
>> Caus

package for data quality in Spark 1.5.2

2016-05-05 Thread Divya Gehlot
Hi,

Is there any package or project in Spark/scala which supports Data Quality
check?
For instance checking null values , foreign key constraint

Would really appreciate ,if somebody has already done it and happy to share
or has any open source package .


Thanks,
Divya


Fwd: package for data quality in Spark 1.5.2

2016-05-05 Thread Divya Gehlot
http://blog.cloudera.com/blog/2015/07/how-to-do-data-quality-checks-using-apache-spark-dataframes/
I am looking for something similar to above solution .
-- Forwarded message --
From: "Divya Gehlot" <divya.htco...@gmail.com>
Date: May 5, 2016 6:51 PM
Subject: package for data quality in Spark 1.5.2
To: "user @spark" <user@spark.apache.org>
Cc:

Hi,

Is there any package or project in Spark/scala which supports Data Quality
check?
For instance checking null values , foreign key constraint

Would really appreciate ,if somebody has already done it and happy to share
or has any open source package .


Thanks,
Divya


Re: [Spark 1.5.2 ]-how to set and get Storage level for Dataframe

2016-05-05 Thread Divya Gehlot
But why ? Any specific reason behind it ?
I am aware of that we can persist the dataframes but before proceeding
would like to know the memory level of my DFs.
I am working on performance tuning of my Spark jobs , looking for Storage
Level APIs like RDDs.




Thanks,
Divya

On 6 May 2016 at 11:16, Ted Yu <yuzhih...@gmail.com> wrote:

> I am afraid there is no such API.
>
> When persisting, you can specify StorageLevel :
>
>   def persist(newLevel: StorageLevel): this.type = {
>
> Can you tell us your use case ?
>
> Thanks
>
> On Thu, May 5, 2016 at 8:06 PM, Divya Gehlot <divya.htco...@gmail.com>
> wrote:
>
>> Hi,
>> How can I get and set storage level for Dataframes like RDDs ,
>> as mentioned in following  book links
>>
>> https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/spark-rdd-caching.html
>>
>>
>>
>> Thanks,
>> Divya
>>
>
>


[Spark 1.5.2 ]-how to set and get Storage level for Dataframe

2016-05-05 Thread Divya Gehlot
Hi,
How can I get and set storage level for Dataframes like RDDs ,
as mentioned in following  book links
https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/spark-rdd-caching.html



Thanks,
Divya


Found Data Quality check package for Spark

2016-05-06 Thread Divya Gehlot
Hi,
I just stumbled upon some data quality check package for spark
https://github.com/FRosner/drunken-data-quality

Has any body used it ?
Would really appreciate the feedback .




Thanks,
Divya


[Spark 1.5.2] Log4j Configuration for executors

2016-04-18 Thread Divya Gehlot
Hi,
I tried configuring logs to write it to file  for Spark Driver and
Executors .
I have two separate log4j properties files for Spark driver and executor
respectively.
Its wrtiting log for Spark driver but for executor logs I am getting below
error :

java.io.FileNotFoundException: /home/hdfs/spark_executor.log (Permission
> denied)
> at java.io.FileOutputStream.open(Native Method)
> at java.io.FileOutputStream.(FileOutputStream.java:221)



Why its giving permission denied for executor log whereas Its writing
driver logs .

I am missing any settings ?


Would really appreciate the help.



Thanks,

Divya


Fwd: [Help]:Strange Issue :Debug Spark Dataframe code

2016-04-17 Thread Divya Gehlot
Reposting again as unable to find the root cause where things are going
wrong.

Experts please help .


-- Forwarded message --
From: Divya Gehlot <divya.htco...@gmail.com>
Date: 15 April 2016 at 19:13
Subject: [Help]:Strange Issue :Debug Spark Dataframe code
To: "user @spark" <user@spark.apache.org>


Hi,
I am using Spark 1.5.2 with Scala 2.10.
Is there any other option apart from "explain(true)" to debug Spark
Dataframe code .

I am facing strange issue .
I have a lookuo dataframe and using it join another  dataframe on different
columns .

I am getting *Analysis exception* in third join.
When I checked  the logical plan ,  its using the same reference for key
but while selecting the columns reference  are changing.
For example
df1 = COLUMN1#15,COLUMN2#16,COLUMN3#17

In first two joins
I am getting the same reference and joining is happening
For first two join  the column  COLUMN1#15  I am getting the COLUMN2#16 and
COLUMN3#17.

But at third join COLUMN1#15 is same but the other column reference are
updating as  COLUMN2#167,COLUMN3#168

Its throwing Spark Analysis Exception

> org.apache.spark.sql.AnalysisException: resolved attribute(s) COLUMN1#15
> missing from


after two joins,the  dataframe has more than 25 columns

Could anybody help light the path by holding the torch.
Would really appreciate the help.

Thanks,
Divya


  1   2   >