yes Sharing the execution flow 15/12/28 00:19:15 INFO SessionState: No Tez session required at this point. hive.execution.engine=mr. 15/12/28 00:19:15 INFO SparkILoop: Created sql context (with Hive support).. SQL context available as sqlContext.
scala> import org.apache.spark.sql.hive.HiveContext import org.apache.spark.sql.hive.HiveContext scala> import org.apache.spark.sql.hive.orc._ import org.apache.spark.sql.hive.orc._ scala> val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc) 15/12/28 00:20:15 WARN SparkConf: The configuration key 'spark.yarn.applicationMaster.waitTries' has been deprecated as of Spark 1.3 and and may be removed in the future. Please use the new key 'spark.yarn.am.waitTime' instead. 15/12/28 00:20:15 INFO HiveContext: Initializing execution hive, version 0.13.1 hiveContext: org.apache.spark.sql.hive.HiveContext = org.apache.spark.sql.hive.HiveContext@9046f81 scala> import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType,FloatType ,LongType ,TimestampType }; import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType, FloatType, LongType, TimestampType} scala> val carsSchema = StructType(Seq(StructField("year", IntegerType, true),StructField("make", StringType, true),StructField("model", StringType, true),StructField("comment", StringType, true),StructField("blank", StringType, true))) carsSchema: org.apache.spark.sql.types.StructType = StructType(StructField(year,IntegerType,true), StructField(make,StringType,true), StructField(model,StringType,true), StructField(comment,StringType,true), StructField(blank,StringType,true)) scala> val carsdf = hiveContext.read.format("com.databricks.spark.csv").option("header", "true").schema(carsSchema).load("/tmp/TestDivya/cars.csv") 15/12/28 00:20:45 INFO HiveContext: Initializing HiveMetastoreConnection version 0.13.1 using Spark classes. carsdf: org.apache.spark.sql.DataFrame = [year: int, make: string, model: string, comment: string, blank: string] scala> val carUsersSchema = StructType(Seq(StructField("Name", StringType, true),StructField("Car_Model", StringType , true))) carUsersSchema: org.apache.spark.sql.types.StructType = StructType(StructField(Name,StringType,true), StructField(Car_Model,StringType,true)) scala> val carUsersdf = hiveContext.read.format("com.databricks.spark.csv").option("header", "false").schema(carUsersSchema).load("/tmp/TestDivya/CarUsers.csv") carUsersdf: org.apache.spark.sql.DataFrame = [Name: string, Car_Model: string] scala> val joineddf = (carsdf.join(carUsersdf, carsdf("model") === carUsersdf("Car_Model"))).select(carUsersdf("Name"),carsdf("make"),carUsersdf("Car_Model")) joineddf: org.apache.spark.sql.DataFrame = [Name: string, make: string, Car_Model: string] scala> joineddf.collect.foreach(println) ........................................ .............................................. 15/12/28 00:21:35 INFO DAGScheduler: ResultStage 3 (collect at <console>:39) finished in 2.261 s 15/12/28 00:21:35 INFO YarnScheduler: Removed TaskSet 3.0, whose tasks have all completed, from pool 15/12/28 00:21:35 INFO DAGScheduler: Job 1 finished: collect at <console>:39, took 5.323441 s [Name3,Chevy,Volt] [Name6,Chevy,Volt] [Name1,Tesla,S] [Name4,Tesla,S] [Name2,Ford,E350] [Name5,Ford,E350] scala> scala> joineddf.write.format("com.databricks.spark.csv").option("header", "true").save("/tmp/TestDivya/CarUserData.csv") 15/12/28 00:25:31 INFO Exchange: Using SparkSqlSerializer2. 15/12/28 00:25:31 INFO Exchange: Using SparkSqlSerializer2. ...................................................................... ...................................................................... 15/12/28 00:25:40 INFO YarnScheduler: Removed TaskSet 6.0, whose tasks have all completed, from pool 15/12/28 00:25:40 INFO DAGScheduler: Job 2 finished: saveAsTextFile at package.scala:157, took 9.293578 s P.S. : Attaching the output file On 28 December 2015 at 12:52, Ted Yu <yuzhih...@gmail.com> wrote: > Can you confirm that file1df("COLUMN2") and file2df("COLUMN10") appeared > in the output of joineddf.collect.foreach(println) > ? > > Thanks > > On Sun, Dec 27, 2015 at 6:32 PM, Divya Gehlot <divya.htco...@gmail.com> > wrote: > >> Hi, >> I am trying to join two dataframes and able to display the results in the >> console ater join. I am saving that data and and saving in the joined data >> in CSV format using spark-csv api . Its just saving the column names not >> data at all. >> >> Below is the sample code for the reference: >> >> spark-shell --packages com.databricks:spark-csv_2.10:1.1.0 --master >>> yarn-client --driver-memory 512m --executor-memory 512m >>> >>> import org.apache.spark.sql.hive.HiveContext >>> import org.apache.spark.sql.hive.orc._ >>> val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc) >>> import org.apache.spark.sql.types.{StructType, StructField, StringType, >>> IntegerType,FloatType ,LongType ,TimestampType }; >>> >>> val firstSchema = StructType(Seq(StructField("COLUMN1", StringType, >>> true),StructField("COLUMN2", StringType, true),StructField("COLUMN2", >>> StringType, true),StructField("COLUMN3", StringType, true) >>> StructField("COLUMN4", StringType, true),StructField("COLUMN5", >>> StringType, true))) >>> val file1df = >>> hiveContext.read.format("com.databricks.spark.csv").option("header", >>> "true").schema(firstSchema).load("/tmp/File1.csv") >>> >>> >>> val secondSchema = StructType(Seq( >>> StructField("COLUMN1", StringType, true), >>> StructField("COLUMN2", NullType , true), >>> StructField("COLUMN3", TimestampType , true), >>> StructField("COLUMN4", TimestampType , true), >>> StructField("COLUMN5", NullType , true), >>> StructField("COLUMN6", StringType, true), >>> StructField("COLUMN7", IntegerType, true), >>> StructField("COLUMN8", IntegerType, true), >>> StructField("COLUMN9", StringType, true), >>> StructField("COLUMN10", IntegerType, true), >>> StructField("COLUMN11", IntegerType, true), >>> StructField("COLUMN12", IntegerType, true))) >>> >>> >>> val file2df = >>> hiveContext.read.format("com.databricks.spark.csv").option("header", >>> "false").schema(secondSchema).load("/tmp/file2.csv") >>> val joineddf = file1df.join(file2df, file1df("COLUMN1") === >>> file2df("COLUMN6")) >>> val selecteddata = >>> joineddf.select(file1df("COLUMN2"),file2df("COLUMN10")) >>> >> //the below statement is printing the joined data >> >>> joineddf.collect.foreach(println) >>> >> >> >>> //this statement saves the CSVfile but only columns names mentioned >>> above on the select are being saved >>> selecteddata.write.format("com.databricks.spark.csv").option("header", >>> "true").save("/tmp/JoinedData.csv") >>> >> >> >> Would really appreciate the pointers /help. >> >> Thanks, >> Divya >> >> >> >> >> >
part-00000(3)
Description: Binary data
--------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org