Repository: spark
Updated Branches:
  refs/heads/master ff48b1b33 -> 9348e6842


[SPARK-22833][EXAMPLE] Improvement SparkHive Scala Examples

## What changes were proposed in this pull request?
Some improvements:
1. Point out we are using both Spark SQ native syntax and HQL syntax in the 
example
2. Avoid using the same table name with temp view, to not confuse users.
3. Create the external hive table with a directory that already has data, which 
is a more common use case.
4. Remove the usage of `spark.sql.parquet.writeLegacyFormat`. This config was 
introduced by https://github.com/apache/spark/pull/8566 and has nothing to do 
with Hive.
5. Remove `repartition` and `coalesce` example. These 2 are not Hive specific, 
we should put them in a different example file. BTW they can't accurately 
control the number of output files, `spark.sql.files.maxRecordsPerFile` also 
controls it.

## How was this patch tested?

N/A

Author: Wenchen Fan <wenc...@databricks.com>

Closes #20081 from cloud-fan/minor.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9348e684
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9348e684
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9348e684

Branch: refs/heads/master
Commit: 9348e684208465a8f75c893bdeaa30fc42c0cb5f
Parents: ff48b1b
Author: Wenchen Fan <wenc...@databricks.com>
Authored: Tue Dec 26 09:37:39 2017 -0800
Committer: gatorsmile <gatorsm...@gmail.com>
Committed: Tue Dec 26 09:37:39 2017 -0800

----------------------------------------------------------------------
 .../examples/sql/hive/SparkHiveExample.scala    | 75 ++++++++++++--------
 .../org/apache/spark/sql/internal/SQLConf.scala |  4 +-
 2 files changed, 46 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/9348e684/examples/src/main/scala/org/apache/spark/examples/sql/hive/SparkHiveExample.scala
----------------------------------------------------------------------
diff --git 
a/examples/src/main/scala/org/apache/spark/examples/sql/hive/SparkHiveExample.scala
 
b/examples/src/main/scala/org/apache/spark/examples/sql/hive/SparkHiveExample.scala
index b193bd5..70fb5b2 100644
--- 
a/examples/src/main/scala/org/apache/spark/examples/sql/hive/SparkHiveExample.scala
+++ 
b/examples/src/main/scala/org/apache/spark/examples/sql/hive/SparkHiveExample.scala
@@ -102,40 +102,53 @@ object SparkHiveExample {
     // |  5| val_5|  5| val_5|
     // ...
 
-    // Create Hive managed table with Parquet
-    sql("CREATE TABLE records(key int, value string) STORED AS PARQUET")
-    // Save DataFrame to Hive managed table as Parquet format
-    val hiveTableDF = sql("SELECT * FROM records")
-    
hiveTableDF.write.mode(SaveMode.Overwrite).saveAsTable("database_name.records")
-    // Create External Hive table with Parquet
-    sql("CREATE EXTERNAL TABLE records(key int, value string) " +
-      "STORED AS PARQUET LOCATION '/user/hive/warehouse/'")
-    // to make Hive Parquet format compatible with Spark Parquet format
-    spark.sqlContext.setConf("spark.sql.parquet.writeLegacyFormat", "true")
-
-    // Multiple Parquet files could be created accordingly to volume of data 
under directory given.
-    val hiveExternalTableLocation = 
"/user/hive/warehouse/database_name.db/records"
-
-    // Save DataFrame to Hive External table as compatible Parquet format
-    
hiveTableDF.write.mode(SaveMode.Overwrite).parquet(hiveExternalTableLocation)
-
-    // Turn on flag for Dynamic Partitioning
-    spark.sqlContext.setConf("hive.exec.dynamic.partition", "true")
-    spark.sqlContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict")
-
-    // You can create partitions in Hive table, so downstream queries run much 
faster.
-    hiveTableDF.write.mode(SaveMode.Overwrite).partitionBy("key")
-      .parquet(hiveExternalTableLocation)
+    // Create a Hive managed Parquet table, with HQL syntax instead of the 
Spark SQL native syntax
+    // `USING hive`
+    sql("CREATE TABLE hive_records(key int, value string) STORED AS PARQUET")
+    // Save DataFrame to the Hive managed table
+    val df = spark.table("src")
+    df.write.mode(SaveMode.Overwrite).saveAsTable("hive_records")
+    // After insertion, the Hive managed table has data now
+    sql("SELECT * FROM hive_records").show()
+    // +---+-------+
+    // |key|  value|
+    // +---+-------+
+    // |238|val_238|
+    // | 86| val_86|
+    // |311|val_311|
+    // ...
 
-    // Reduce number of files for each partition by repartition
-    hiveTableDF.repartition($"key").write.mode(SaveMode.Overwrite)
-      .partitionBy("key").parquet(hiveExternalTableLocation)
+    // Prepare a Parquet data directory
+    val dataDir = "/tmp/parquet_data"
+    spark.range(10).write.parquet(dataDir)
+    // Create a Hive external Parquet table
+    sql(s"CREATE EXTERNAL TABLE hive_ints(key int) STORED AS PARQUET LOCATION 
'$dataDir'")
+    // The Hive external table should already have data
+    sql("SELECT * FROM hive_ints").show()
+    // +---+
+    // |key|
+    // +---+
+    // |  0|
+    // |  1|
+    // |  2|
+    // ...
 
-    // Control the number of files in each partition by coalesce
-    hiveTableDF.coalesce(10).write.mode(SaveMode.Overwrite)
-      .partitionBy("key").parquet(hiveExternalTableLocation)
-    // $example off:spark_hive$
+    // Turn on flag for Hive Dynamic Partitioning
+    spark.sqlContext.setConf("hive.exec.dynamic.partition", "true")
+    spark.sqlContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict")
+    // Create a Hive partitioned table using DataFrame API
+    df.write.partitionBy("key").format("hive").saveAsTable("hive_part_tbl")
+    // Partitioned column `key` will be moved to the end of the schema.
+    sql("SELECT * FROM hive_part_tbl").show()
+    // +-------+---+
+    // |  value|key|
+    // +-------+---+
+    // |val_238|238|
+    // | val_86| 86|
+    // |val_311|311|
+    // ...
 
     spark.stop()
+    // $example off:spark_hive$
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/9348e684/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 84fe4bb..f16972e 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -336,8 +336,8 @@ object SQLConf {
     .createWithDefault(true)
 
   val PARQUET_WRITE_LEGACY_FORMAT = 
buildConf("spark.sql.parquet.writeLegacyFormat")
-    .doc("Whether to follow Parquet's format specification when converting 
Parquet schema to " +
-      "Spark SQL schema and vice versa.")
+    .doc("Whether to be compatible with the legacy Parquet format adopted by 
Spark 1.4 and prior " +
+      "versions, when converting Parquet schema to Spark SQL schema and vice 
versa.")
     .booleanConf
     .createWithDefault(false)
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to