yes
Sharing the execution flow

15/12/28 00:19:15 INFO SessionState: No Tez session required at this point.
hive.execution.engine=mr.
15/12/28 00:19:15 INFO SparkILoop: Created sql context (with Hive support)..
SQL context available as sqlContext.

scala> import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.hive.HiveContext

scala> import org.apache.spark.sql.hive.orc._
import org.apache.spark.sql.hive.orc._

scala> val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
15/12/28 00:20:15 WARN SparkConf: The configuration key
'spark.yarn.applicationMaster.waitTries' has been deprecated as of Spark
1.3 and and may be removed in the future. Please use the new key
'spark.yarn.am.waitTime' instead.
15/12/28 00:20:15 INFO HiveContext: Initializing execution hive, version
0.13.1
hiveContext: org.apache.spark.sql.hive.HiveContext =
org.apache.spark.sql.hive.HiveContext@9046f81

scala> import org.apache.spark.sql.types.{StructType, StructField,
StringType, IntegerType,FloatType ,LongType ,TimestampType };
import org.apache.spark.sql.types.{StructType, StructField, StringType,
IntegerType, FloatType, LongType, TimestampType}

scala> val carsSchema = StructType(Seq(StructField("year", IntegerType,
true),StructField("make", StringType, true),StructField("model",
StringType, true),StructField("comment", StringType,
true),StructField("blank", StringType, true)))
carsSchema: org.apache.spark.sql.types.StructType =
StructType(StructField(year,IntegerType,true),
StructField(make,StringType,true), StructField(model,StringType,true),
StructField(comment,StringType,true), StructField(blank,StringType,true))

scala> val carsdf =
hiveContext.read.format("com.databricks.spark.csv").option("header",
"true").schema(carsSchema).load("/tmp/TestDivya/cars.csv")
15/12/28 00:20:45 INFO HiveContext: Initializing HiveMetastoreConnection
version 0.13.1 using Spark classes.
carsdf: org.apache.spark.sql.DataFrame = [year: int, make: string, model:
string, comment: string, blank: string]

scala> val carUsersSchema = StructType(Seq(StructField("Name", StringType,
true),StructField("Car_Model", StringType  , true)))
carUsersSchema: org.apache.spark.sql.types.StructType =
StructType(StructField(Name,StringType,true),
StructField(Car_Model,StringType,true))

scala> val carUsersdf =
hiveContext.read.format("com.databricks.spark.csv").option("header",
"false").schema(carUsersSchema).load("/tmp/TestDivya/CarUsers.csv")
carUsersdf: org.apache.spark.sql.DataFrame = [Name: string, Car_Model:
string]

scala> val joineddf = (carsdf.join(carUsersdf, carsdf("model") ===
carUsersdf("Car_Model"))).select(carUsersdf("Name"),carsdf("make"),carUsersdf("Car_Model"))
joineddf: org.apache.spark.sql.DataFrame = [Name: string, make: string,
Car_Model: string]

scala> joineddf.collect.foreach(println)
........................................
..............................................

15/12/28 00:21:35 INFO DAGScheduler: ResultStage 3 (collect at
<console>:39) finished in 2.261 s
15/12/28 00:21:35 INFO YarnScheduler: Removed TaskSet 3.0, whose tasks have
all completed, from pool
15/12/28 00:21:35 INFO DAGScheduler: Job 1 finished: collect at
<console>:39, took 5.323441 s
[Name3,Chevy,Volt]
[Name6,Chevy,Volt]
[Name1,Tesla,S]
[Name4,Tesla,S]
[Name2,Ford,E350]
[Name5,Ford,E350]

scala>


scala> joineddf.write.format("com.databricks.spark.csv").option("header",
"true").save("/tmp/TestDivya/CarUserData.csv")
15/12/28 00:25:31 INFO Exchange: Using SparkSqlSerializer2.
15/12/28 00:25:31 INFO Exchange: Using SparkSqlSerializer2.
......................................................................
......................................................................
15/12/28 00:25:40 INFO YarnScheduler: Removed TaskSet 6.0, whose tasks have
all completed, from pool
15/12/28 00:25:40 INFO DAGScheduler: Job 2 finished: saveAsTextFile at
package.scala:157, took 9.293578 s

P.S. : Attaching the output file

On 28 December 2015 at 12:52, Ted Yu <yuzhih...@gmail.com> wrote:

> Can you confirm that file1df("COLUMN2") and file2df("COLUMN10") appeared
> in the output of joineddf.collect.foreach(println)
>  ?
>
> Thanks
>
> On Sun, Dec 27, 2015 at 6:32 PM, Divya Gehlot <divya.htco...@gmail.com>
> wrote:
>
>> Hi,
>> I am trying to join two dataframes and able to display the results in the
>> console ater join. I am saving that data and and saving in the joined data
>> in CSV format using spark-csv api . Its just saving the column names not
>> data at all.
>>
>> Below is the sample code for the reference:
>>
>> spark-shell   --packages com.databricks:spark-csv_2.10:1.1.0  --master
>>> yarn-client --driver-memory 512m --executor-memory 512m
>>>
>>> import org.apache.spark.sql.hive.HiveContext
>>> import org.apache.spark.sql.hive.orc._
>>> val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
>>> import org.apache.spark.sql.types.{StructType, StructField, StringType,
>>> IntegerType,FloatType ,LongType ,TimestampType };
>>>
>>> val firstSchema = StructType(Seq(StructField("COLUMN1", StringType,
>>> true),StructField("COLUMN2", StringType, true),StructField("COLUMN2",
>>> StringType, true),StructField("COLUMN3", StringType, true)
>>> StructField("COLUMN4", StringType, true),StructField("COLUMN5",
>>> StringType, true)))
>>> val file1df =
>>> hiveContext.read.format("com.databricks.spark.csv").option("header",
>>> "true").schema(firstSchema).load("/tmp/File1.csv")
>>>
>>>
>>> val secondSchema = StructType(Seq(
>>> StructField("COLUMN1", StringType, true),
>>> StructField("COLUMN2", NullType  , true),
>>> StructField("COLUMN3", TimestampType , true),
>>> StructField("COLUMN4", TimestampType , true),
>>> StructField("COLUMN5", NullType , true),
>>> StructField("COLUMN6", StringType, true),
>>> StructField("COLUMN7", IntegerType, true),
>>> StructField("COLUMN8", IntegerType, true),
>>> StructField("COLUMN9", StringType, true),
>>> StructField("COLUMN10", IntegerType, true),
>>> StructField("COLUMN11", IntegerType, true),
>>> StructField("COLUMN12", IntegerType, true)))
>>>
>>>
>>> val file2df =
>>> hiveContext.read.format("com.databricks.spark.csv").option("header",
>>> "false").schema(secondSchema).load("/tmp/file2.csv")
>>> val joineddf = file1df.join(file2df, file1df("COLUMN1") ===
>>> file2df("COLUMN6"))
>>> val selecteddata =
>>> joineddf.select(file1df("COLUMN2"),file2df("COLUMN10"))
>>>
>> //the below statement is printing the joined data
>>
>>> joineddf.collect.foreach(println)
>>>
>>
>>
>>> //this statement saves the CSVfile but only columns names mentioned
>>> above on the select are being saved
>>> selecteddata.write.format("com.databricks.spark.csv").option("header",
>>> "true").save("/tmp/JoinedData.csv")
>>>
>>
>>
>> Would really appreciate the pointers /help.
>>
>> Thanks,
>> Divya
>>
>>
>>
>>
>>
>

Attachment: part-00000(3)
Description: Binary data

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to