This is very simple

in Hive

Status: Running (Hive on Spark job[1])
Job Progress Format
CurrentTime StageId_StageAttemptId:
SucceededTasksCount(+RunningTasksCount-FailedTasksCount)/TotalTasksCount
[StageCost]
2016-07-06 17:17:16,006 Stage-1_0: 0(+1)/1
2016-07-06 17:17:17,011 Stage-1_0: 1/1 Finished
Status: Finished successfully in 2.02 seconds
OK
1001    aba     10      10      DEV
1002    abs     20      20      TEST
1003    abd     10      10      DEV
1001    aba     10      10      DEV
1002    abs     20      20      TEST
1003    abd     10      10      DEV
1004    abf     30      30      IT
1005    abg     10      10      DEV
1004    abf     30      30      IT
1005    abg     10      10      DEV
1006    abh     20      20      TEST
1007    abj     10      10      DEV
1006    abh     20      20      TEST
1007    abj     10      10      DEV
1008    abk     30      30      IT
1009    abl     20      20      TEST
1010    abq     10      10      DEV
1008    abk     30      30      IT
1009    abl     20      20      TEST
1010    abq     10      10      DEV


In Spark

scala> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
HiveContext: org.apache.spark.sql.hive.HiveContext =
org.apache.spark.sql.hive.HiveContext@f9402c2

scala> HiveContext.sql("use test")
res1: org.apache.spark.sql.DataFrame = [result: string]

scala> val e = HiveContext.table("emp")
e: org.apache.spark.sql.DataFrame = [emp_id: int, name: string, deptid: int]
scala> val d = HiveContext.table("dept")

d: org.apache.spark.sql.DataFrame = [deptid: int, dept_name: string]
scala> val rs = e.join(d,e("deptid")===d("deptid"),
"fullouter").collect.foreach(println)
[1001,aba,10,10,DEV]
[1003,abd,10,10,DEV]
[1001,aba,10,10,DEV]
[1003,abd,10,10,DEV]
[1005,abg,10,10,DEV]
[1005,abg,10,10,DEV]
[1007,abj,10,10,DEV]
[1007,abj,10,10,DEV]
[1010,abq,10,10,DEV]
[1010,abq,10,10,DEV]
[1002,abs,20,20,TEST]
[1002,abs,20,20,TEST]
[1006,abh,20,20,TEST]
[1006,abh,20,20,TEST]
[1009,abl,20,20,TEST]
[1009,abl,20,20,TEST]
[1004,abf,30,30,IT]
[1004,abf,30,30,IT]
[1008,abk,30,30,IT]
[1008,abk,30,30,IT]



Note that you need to enforce ordering

HTH











Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 6 July 2016 at 14:00, ayan guha <guha.a...@gmail.com> wrote:

> looks like a data issue to me. Either EMP or DEPT has spaces in dept id
> for deptid=20,30.
>
> Did you check in hive cli?
>
> On Wed, Jul 6, 2016 at 10:33 PM, radha <grkmc...@gmail.com> wrote:
>
>> Hi All,
>>
>> Please check below for the code and input and output, i think the output
>> is
>> not correct, i  am missing any thing? pls guide
>>
>> Code
>>
>> public class Test {
>>         private static JavaSparkContext jsc = null;
>>         private static SQLContext sqlContext = null;
>>         private static Configuration hadoopConf = null;
>>         public static void main(String[] args) {
>>
>>                 jsc = GlobalSparkContext.getJavaSparkContext();
>>                 sqlContext = GlobalSparkContext.getSQLContext(jsc);
>>
>>                 hadoopConf = new Configuration(jsc.hadoopConfiguration());
>>
>>
>> hadoopConf.set("textinputformat.record.delimiter",GlobalSparkContext.lineSeparator);
>>
>>                 try {
>>                         final Emp emp = new Emp();
>>                         final Dept dept = new Dept();
>>
>>                         JavaPairRDD<LongWritable, Text> deptinputLines =
>> jsc.newAPIHadoopFile(args[0], TextInputFormat.class,LongWritable.class,
>> Text.class, hadoopConf);
>>                         JavaRDD<Dept> deptRDD = deptinputLines.map(new
>> Function<Tuple2&lt;LongWritable, Text>, String>() {
>>                                                 @Override
>>                                                 public String
>> call(Tuple2<LongWritable, Text> arg0)     throws Exception {
>>                                                         return
>> arg0._2.toString();
>>                                                 }
>>
>>                                         }).map(new Function<String,
>> Dept>() {
>>
>>                                 public Dept call(String recordLine)
>> throws Exception {
>>                                         String[] parts =
>> recordLine.split(GlobalSparkContext.recordSeparator);
>>                                         return getInstanceDept(parts,
>> dept);
>>                                 }
>>                         });
>>
>>                         DataFrame deptDF =
>> sqlContext.createDataFrame(deptRDD, Dept.class);
>>                         deptDF.registerTempTable("DEPT");
>>                         //deptDF.show();
>>
>>                         JavaPairRDD<LongWritable, Text> inputLines =
>> jsc.newAPIHadoopFile(args[1], TextInputFormat.class, LongWritable.class,
>> Text.class, hadoopConf);
>>                         JavaRDD<Emp> empRDD = inputLines.map(new
>> Function<Tuple2&lt;LongWritable,
>> Text>, String>() {
>>
>>                                                 private static final long
>> serialVersionUID = 3371707560417405016L;
>>
>>                                                 @Override
>>                                                 public String
>> call(Tuple2<LongWritable, Text> arg0)     throws Exception {
>>                                                         return
>> arg0._2.toString();
>>                                                 }
>>
>>                                         }).map(new Function<String,
>> Emp>() {
>>
>>                                 private static final long
>> serialVersionUID = 7656942162815285622L;
>>
>>                                 public Emp call(String recordLine) throws
>> Exception {
>>                                         String[] parts =
>> recordLine.split(GlobalSparkContext.recordSeparator);
>>                                         return getInstance(parts, emp);
>>                                 }
>>                         });
>>                         DataFrame empDF =
>> sqlContext.createDataFrame(empRDD, Emp.class);
>>                         empDF.registerTempTable("EMP");
>>
>>                    sqlContext.sql("SELECT * FROM EMP e LEFT OUTER JOIN
>> DEPT d ON e.deptid
>> = d.deptid").show();
>>
>>
>>
>> //empDF.join(deptDF,empDF.col("deptid").equalTo(deptDF.col("deptid")),"leftouter").show();;
>>
>>                 }
>>                 catch(Exception e){
>>                         System.out.println(e);
>>                 }
>>         }
>>         public static Emp getInstance(String[] parts, Emp emp) throws
>> ParseException {
>>                 emp.setId(parts[0]);
>>                 emp.setName(parts[1]);
>>                 emp.setDeptid(parts[2]);
>>
>>                 return emp;
>>         }
>>         public static Dept getInstanceDept(String[] parts, Dept dept)
>> throws
>> ParseException {
>>                 dept.setDeptid(parts[0]);
>>                 dept.setDeptname(parts[1]);
>>                 return dept;
>>         }
>> }
>>
>> Input
>> Emp
>> 1001 aba 10
>> 1002 abs 20
>> 1003 abd 10
>> 1004 abf 30
>> 1005 abg 10
>> 1006 abh 20
>> 1007 abj 10
>> 1008 abk 30
>> 1009 abl 20
>> 1010 abq 10
>>
>> Dept
>> 10 dev
>> 20 Test
>> 30 IT
>>
>> Output
>> +------+------+----+------+--------+
>> |deptid|    id|name|deptid|deptname|
>> +------+------+----+------+--------+
>> |    10|  1001| aba|    10|     dev|
>> |    10|  1003| abd|    10|     dev|
>> |    10|  1005| abg|    10|     dev|
>> |    10|  1007| abj|    10|     dev|
>> |    10|  1010| abq|    10|     dev|
>> |    20|  1002| abs|  null|    null|
>> |    20|  1006| abh|  null|    null|
>> |    20|  1009| abl|  null|    null|
>> |    30|  1004| abf|  null|    null|
>> |    30|  1008| abk|  null|    null|
>> +------+------+----+------+--------+
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Left-outer-Join-issue-using-programmatic-sql-joins-tp27295.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> ---------------------------------------------------------------------
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>
>
> --
> Best Regards,
> Ayan Guha
>

Reply via email to