Question regarding kryo and java encoders in datasets

2019-01-03 Thread Devender Yadav
Hi All,



Good day!


I am using spark 2.4 and referring 
https://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-persistence

Bean class:

public class EmployeeBean implements Serializable {

private Long id;
private String name;
private Long salary;
private Integer age;

// getters and setters

}


Spark Example:

SparkSession spark = 
SparkSession.builder().master("local[4]").appName("play-with-spark").getOrCreate();

List employees1 = populateEmployees(1, 1_000_000);

Dataset ds1 = spark.createDataset(employees1, 
Encoders.kryo(EmployeeBean.class));
Dataset ds2 = spark.createDataset(employees1, 
Encoders.bean(EmployeeBean.class));

ds1.persist(StorageLevel.MEMORY_ONLY());
long ds1Count = ds1.count();

ds2.persist(StorageLevel.MEMORY_ONLY());
long ds2Count = ds2.count();


I looked for storage in spark Web UI. Useful part -

ID  RDD Name   Size in Memory
2   LocalTableScan [value#0]   56.5 MB
13  LocalTableScan [age#6, id#7L, name#8, salary#9L]   23.3 MB


Few questions:

  *   Shouldn't size of kryo serialized RDD be less than java serialized RDD 
instead of more than double size?

  *   I also tried MEMORY_ONLY_SER() mode and RDD size is the same. RDD as 
serialized Java objects should be stored as one byte array per partition. 
Shouldn't the size of persisted RDDs be less than deserialized ones?

  *   What exactly is adding kyro and bean encoders are doing while creating 
Dataset?

  *   Can I rename persisted RDDs for better readability?



Regards,
Devender








NOTE: This message may contain information that is confidential, proprietary, 
privileged or otherwise protected by law. The message is intended solely for 
the named addressee. If received in error, please destroy and notify the 
sender. Any use of this email is prohibited when received in error. Impetus 
does not represent, warrant and/or guarantee, that the integrity of this 
communication has been maintained nor that the communication is free of errors, 
virus, interception or interference.
<>
-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Re: Add column value in the dataset on the basis of a condition

2018-12-18 Thread Devender Yadav
Thanks, Yunus. It solved my problem.


Regards,
Devender

From: Shahab Yunus 
Sent: Tuesday, December 18, 2018 8:27:51 PM
To: Devender Yadav
Cc: user@spark.apache.org
Subject: Re: Add column value in the dataset on the basis of a condition

Sorry Devender, I hit the send button sooner by mistake. I meant to add more 
info.

So what I was trying to say was that you can use withColumn with when/otherwise 
clauses to add a column conditionally. See an example here:
https://stackoverflow.com/questions/34908448/spark-add-column-to-dataframe-conditionally<https://apac01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fstackoverflow.com%2Fquestions%2F34908448%2Fspark-add-column-to-dataframe-conditionally=01%7C01%7Cdevender.yadav%40impetus.co.in%7C89cce9efc5c1492d70a908d664f937db%7Cb9dfa90567614a548aa07b7647bbafb8%7C0=pmOmAeobt4fXOuZB2VKFnl0ckMTfZ4LeHyocQ7o%2B5sA%3D=0>

On Tue, Dec 18, 2018 at 9:55 AM Shahab Yunus 
mailto:shahab.yu...@gmail.com>> wrote:
Have you tried using withColumn? You can add a boolean column based on whether 
the age exists or not and then drop the older age column. You wouldn't need 
union of dataframes then

On Tue, Dec 18, 2018 at 8:58 AM Devender Yadav 
mailto:devender.ya...@impetus.co.in>> wrote:
Hi All,


useful code:

public class EmployeeBean implements Serializable {

private Long id;

private String name;

private Long salary;

private Integer age;

// getters and setters

}


Relevant spark code:

SparkSession spark = 
SparkSession.builder().master("local[2]").appName("play-with-spark").getOrCreate();
List employees1 = populateEmployees(1, 10);

Dataset ds1 = spark.createDataset(employees1, 
Encoders.bean(EmployeeBean.class));
ds1.show();
ds1.printSchema();

Dataset ds2 = ds1.where("age is null").withColumn("is_age_null", 
lit(true));
Dataset ds3 = ds1.where("age is not null").withColumn("is_age_null", 
lit(false));

Dataset ds4 = ds2.union(ds3);
ds4.show();


Relevant Output:


ds1

++---++--+
| age| id|name|salary|
++---++--+
|null|  1|dev1| 11000|
|   2|  2|dev2| 12000|
|null|  3|dev3| 13000|
|   4|  4|dev4| 14000|
|null|  5|dev5| 15000|
++---++--+


ds4

++---++--+---+
| age| id|name|salary|is_age_null|
++---++--+---+
|null|  1|dev1| 11000|   true|
|null|  3|dev3| 13000|   true|
|null|  5|dev5| 15000|   true|
|   2|  2|dev2| 12000|  false|
|   4|  4|dev4| 14000|  false|
++---++--+---+


Is there any better solution to add this column in the dataset rather than 
creating two datasets and performing union?

<https://stackoverflow.com/questions/53834286/add-column-value-in-spark-dataset-on-the-basis-of-the-condition<https://apac01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fstackoverflow.com%2Fquestions%2F53834286%2Fadd-column-value-in-spark-dataset-on-the-basis-of-the-condition=01%7C01%7Cdevender.yadav%40impetus.co.in%7C89cce9efc5c1492d70a908d664f937db%7Cb9dfa90567614a548aa07b7647bbafb8%7C0=0WTnk7a3YSJcqrs87zH5k38Mh6kStY7%2Fn%2BGxffGkJY0%3D=0>>



Regards,
Devender








NOTE: This message may contain information that is confidential, proprietary, 
privileged or otherwise protected by law. The message is intended solely for 
the named addressee. If received in error, please destroy and notify the 
sender. Any use of this email is prohibited when received in error. Impetus 
does not represent, warrant and/or guarantee, that the integrity of this 
communication has been maintained nor that the communication is free of errors, 
virus, interception or interference.

-
To unsubscribe e-mail: 
user-unsubscr...@spark.apache.org<mailto:user-unsubscr...@spark.apache.org>








NOTE: This message may contain information that is confidential, proprietary, 
privileged or otherwise protected by law. The message is intended solely for 
the named addressee. If received in error, please destroy and notify the 
sender. Any use of this email is prohibited when received in error. Impetus 
does not represent, warrant and/or guarantee, that the integrity of this 
communication has been maintained nor that the communication is free of errors, 
virus, interception or interference.
<>
-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Add column value in the dataset on the basis of a condition

2018-12-18 Thread Devender Yadav
Hi All,


useful code:

public class EmployeeBean implements Serializable {

private Long id;

private String name;

private Long salary;

private Integer age;

// getters and setters

}


Relevant spark code:

SparkSession spark = 
SparkSession.builder().master("local[2]").appName("play-with-spark").getOrCreate();
List employees1 = populateEmployees(1, 10);

Dataset ds1 = spark.createDataset(employees1, 
Encoders.bean(EmployeeBean.class));
ds1.show();
ds1.printSchema();

Dataset ds2 = ds1.where("age is null").withColumn("is_age_null", 
lit(true));
Dataset ds3 = ds1.where("age is not null").withColumn("is_age_null", 
lit(false));

Dataset ds4 = ds2.union(ds3);
ds4.show();


Relevant Output:


ds1

++---++--+
| age| id|name|salary|
++---++--+
|null|  1|dev1| 11000|
|   2|  2|dev2| 12000|
|null|  3|dev3| 13000|
|   4|  4|dev4| 14000|
|null|  5|dev5| 15000|
++---++--+


ds4

++---++--+---+
| age| id|name|salary|is_age_null|
++---++--+---+
|null|  1|dev1| 11000|   true|
|null|  3|dev3| 13000|   true|
|null|  5|dev5| 15000|   true|
|   2|  2|dev2| 12000|  false|
|   4|  4|dev4| 14000|  false|
++---++--+---+


Is there any better solution to add this column in the dataset rather than 
creating two datasets and performing union?





Regards,
Devender








NOTE: This message may contain information that is confidential, proprietary, 
privileged or otherwise protected by law. The message is intended solely for 
the named addressee. If received in error, please destroy and notify the 
sender. Any use of this email is prohibited when received in error. Impetus 
does not represent, warrant and/or guarantee, that the integrity of this 
communication has been maintained nor that the communication is free of errors, 
virus, interception or interference.
<>
-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Create dataframe from RDBMS table using JDBC

2017-04-26 Thread Devender Yadav
Hi All,


I am using Spak 1.6.2


Which is suitable way to create dataframe from RDBMS table.


DataFrame df = 
sqlContext.read().format("jdbc").options(options).load();

or

DataFrame df = sqlContext.read().jdbc(url, table, properties);



Regards,
Devender








NOTE: This message may contain information that is confidential, proprietary, 
privileged or otherwise protected by law. The message is intended solely for 
the named addressee. If received in error, please destroy and notify the 
sender. Any use of this email is prohibited when received in error. Impetus 
does not represent, warrant and/or guarantee, that the integrity of this 
communication has been maintained nor that the communication is free of errors, 
virus, interception or interference.


Re: Arraylist is empty after JavaRDD.foreach

2017-04-24 Thread Devender Yadav
Hi Franke,


I want to convert DataFrame to JSON String.


Regards,
Devender

From: Jörn Franke <jornfra...@gmail.com>
Sent: Monday, April 24, 2017 11:15:08 PM
To: Devender Yadav
Cc: user@spark.apache.org
Subject: Re: Arraylist is empty after JavaRDD.foreach

I am not sure what you try to achieve here. You should never use the arraylist 
as you use it here as a global variable (an anti-pattern). Why don't you use 
the count function of the dataframe?

On 24. Apr 2017, at 19:36, Devender Yadav 
<devender.ya...@impetus.co.in<mailto:devender.ya...@impetus.co.in>> wrote:


Hi All,


I am using Spark 1.6.2 and Java 7.


Sample json (total 100 records):

{"name":"dev","salary":1,"occupation":"engg","address":"noida"}

{"name":"karthik","salary":2,"occupation":"engg","address":"noida"}

Useful code:

   final List<Map<String,String>> jsonData = new ArrayList<>();

   DataFrame df =  
sqlContext.read().json("file:///home/dev/data-json/emp.json");
   JavaRDD rdd = df.repartition(1).toJSON().toJavaRDD();

   rdd.foreach(new VoidFunction() {
   @Override
   public void call(String line)  {
   try {
   jsonData.add (new ObjectMapper().readValue(line, Map.class));
   System.out.println(Thread.currentThread().getName());
   System.out.println("List size: "+jsonData.size());
   } catch (IOException e) {
   e.printStackTrace();
   }
   }
   });

   System.out.println(Thread.currentThread().getName());
   System.out.println("List size: "+jsonData.size());

jsonData List is empty in the end.


Output:

Executor task launch worker-1
List size: 1
Executor task launch worker-1
List size: 2
Executor task launch worker-1
List size: 3
.
.
.
Executor task launch worker-1
List size: 100

main
List size: 0



Regards,
Devender








NOTE: This message may contain information that is confidential, proprietary, 
privileged or otherwise protected by law. The message is intended solely for 
the named addressee. If received in error, please destroy and notify the 
sender. Any use of this email is prohibited when received in error. Impetus 
does not represent, warrant and/or guarantee, that the integrity of this 
communication has been maintained nor that the communication is free of errors, 
virus, interception or interference.








NOTE: This message may contain information that is confidential, proprietary, 
privileged or otherwise protected by law. The message is intended solely for 
the named addressee. If received in error, please destroy and notify the 
sender. Any use of this email is prohibited when received in error. Impetus 
does not represent, warrant and/or guarantee, that the integrity of this 
communication has been maintained nor that the communication is free of errors, 
virus, interception or interference.


How to convert DataFrame to JSON String in Java 7

2017-04-24 Thread Devender Yadav
Hi All,



How can I convert DataFrame to JSON String in Java 7. I am using Spark 1.6.3


I don't want to print on console. I need to return JSON return to another 
method.


Thanks for your attention!



Regards,
Devender








NOTE: This message may contain information that is confidential, proprietary, 
privileged or otherwise protected by law. The message is intended solely for 
the named addressee. If received in error, please destroy and notify the 
sender. Any use of this email is prohibited when received in error. Impetus 
does not represent, warrant and/or guarantee, that the integrity of this 
communication has been maintained nor that the communication is free of errors, 
virus, interception or interference.


Arraylist is empty after JavaRDD.foreach

2017-04-24 Thread Devender Yadav
Hi All,


I am using Spark 1.6.2 and Java 7.


Sample json (total 100 records):

{"name":"dev","salary":1,"occupation":"engg","address":"noida"}

{"name":"karthik","salary":2,"occupation":"engg","address":"noida"}

Useful code:

   final List> jsonData = new ArrayList<>();

   DataFrame df =  
sqlContext.read().json("file:///home/dev/data-json/emp.json");
   JavaRDD rdd = df.repartition(1).toJSON().toJavaRDD();

   rdd.foreach(new VoidFunction() {
   @Override
   public void call(String line)  {
   try {
   jsonData.add (new ObjectMapper().readValue(line, Map.class));
   System.out.println(Thread.currentThread().getName());
   System.out.println("List size: "+jsonData.size());
   } catch (IOException e) {
   e.printStackTrace();
   }
   }
   });

   System.out.println(Thread.currentThread().getName());
   System.out.println("List size: "+jsonData.size());

jsonData List is empty in the end.


Output:

Executor task launch worker-1
List size: 1
Executor task launch worker-1
List size: 2
Executor task launch worker-1
List size: 3
.
.
.
Executor task launch worker-1
List size: 100

main
List size: 0



Regards,
Devender








NOTE: This message may contain information that is confidential, proprietary, 
privileged or otherwise protected by law. The message is intended solely for 
the named addressee. If received in error, please destroy and notify the 
sender. Any use of this email is prohibited when received in error. Impetus 
does not represent, warrant and/or guarantee, that the integrity of this 
communication has been maintained nor that the communication is free of errors, 
virus, interception or interference.


Re: How to maintain order of key-value in DataFrame same as JSON?

2017-04-24 Thread Devender Yadav
Thanks Hemanth for a quick reply.


From: Hemanth Gudela <hemanth.gud...@qvantel.com>
Sent: Monday, April 24, 2017 6:37:48 PM
To: Devender Yadav; user@spark.apache.org
Subject: Re: How to maintain order of key-value in DataFrame same as JSON?

Hi,

One option to use if you can is to force df to use the schema order you prefer 
like this.

DataFrame df = 
sqlContext.read().json(jsonPath).select("name","salary","occupation","address")

-Hemanth

From: Devender Yadav <devender.ya...@impetus.co.in>
Date: Monday, 24 April 2017 at 15.45
To: "user@spark.apache.org" <user@spark.apache.org>
Subject: How to maintain order of key-value in DataFrame same as JSON?


{"name": "dev","salary": 100,"occupation": "engg","address": "noida"}

{"name": "karthik","salary": 200,"occupation": "engg","address": "blore"}








NOTE: This message may contain information that is confidential, proprietary, 
privileged or otherwise protected by law. The message is intended solely for 
the named addressee. If received in error, please destroy and notify the 
sender. Any use of this email is prohibited when received in error. Impetus 
does not represent, warrant and/or guarantee, that the integrity of this 
communication has been maintained nor that the communication is free of errors, 
virus, interception or interference.


How to maintain order of key-value in DataFrame same as JSON?

2017-04-24 Thread Devender Yadav
Hi All,


Sample JSON data:

{"name": "dev","salary": 100,"occupation": "engg","address": "noida"}

{"name": "karthik","salary": 200,"occupation": "engg","address": "blore"}

Spark Java code:

DataFrame df = sqlContext.read().json(jsonPath);
df.printSchema();
df.show(false);


Output:

root
 |-- address: string (nullable = true)
 |-- name: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- salary: long (nullable = true)


+---+---+--+--+
|address|name   |occupation|salary|
+---+---+--+--+
|noida  |dev|engg  |1 |
|blore  |karthik|engg  |2 |
+---+---+--+--+


Columns are arranged in the alphabetical order.


Is there any way to maintain natural order?



Regards,
Devender








NOTE: This message may contain information that is confidential, proprietary, 
privileged or otherwise protected by law. The message is intended solely for 
the named addressee. If received in error, please destroy and notify the 
sender. Any use of this email is prohibited when received in error. Impetus 
does not represent, warrant and/or guarantee, that the integrity of this 
communication has been maintained nor that the communication is free of errors, 
virus, interception or interference.


Partitioning in spark while reading from RDBMS via JDBC

2017-03-31 Thread Devender Yadav
Hi All,


I am running spark in cluster mode and reading data from RDBMS via JDBC.

As per spark 
docs,
 these partitioning parameters describe how to partition the table when reading 
in parallel from multiple workers:

partitionColumn,
lowerBound,
upperBound,
numPartitions


These are optional parameters.

What would happen if I don't specify these:

  *   Only 1 worker read the whole data?
  *   If it still reads parallelly, how does it partition data?



Regards,
Devender








NOTE: This message may contain information that is confidential, proprietary, 
privileged or otherwise protected by law. The message is intended solely for 
the named addressee. If received in error, please destroy and notify the 
sender. Any use of this email is prohibited when received in error. Impetus 
does not represent, warrant and/or guarantee, that the integrity of this 
communication has been maintained nor that the communication is free of errors, 
virus, interception or interference.


How to insert nano seconds in the TimestampType in Spark

2017-03-27 Thread Devender Yadav
Hi All,

I am using spark version - 1.6.1

I have a text table in hive having `timestamp` datatype with nanoseconds 
precision.

Hive Table Schema:

c_timestamp timestamp

Hive Table data:

hive> select * from tbl1;
OK
00:00:00.1
12:12:12.123456789
23:59:59.9

But as per the docs, from Spark 1.5

Timestamps are now stored at a precision of 1us, rather than 1ns

Sample code:

SparkConf conf = new 
SparkConf(true).setMaster("yarn-cluster").setAppName("SAMPLE_APP");
SparkContext sc = new SparkContext(conf);
HiveContext hc = new HiveContext(sc);
DataFrame df = hc.table("testdb.tbl1");

Data is truncated to microseconds.

00:00:00
12:12:12.123456
23:59:59.99


Is there any way to use nanoseconds here?


Regards,
Devender









NOTE: This message may contain information that is confidential, proprietary, 
privileged or otherwise protected by law. The message is intended solely for 
the named addressee. If received in error, please destroy and notify the 
sender. Any use of this email is prohibited when received in error. Impetus 
does not represent, warrant and/or guarantee, that the integrity of this 
communication has been maintained nor that the communication is free of errors, 
virus, interception or interference.