Repository: spark Updated Branches: refs/heads/master ff48b1b33 -> 9348e6842
[SPARK-22833][EXAMPLE] Improvement SparkHive Scala Examples ## What changes were proposed in this pull request? Some improvements: 1. Point out we are using both Spark SQ native syntax and HQL syntax in the example 2. Avoid using the same table name with temp view, to not confuse users. 3. Create the external hive table with a directory that already has data, which is a more common use case. 4. Remove the usage of `spark.sql.parquet.writeLegacyFormat`. This config was introduced by https://github.com/apache/spark/pull/8566 and has nothing to do with Hive. 5. Remove `repartition` and `coalesce` example. These 2 are not Hive specific, we should put them in a different example file. BTW they can't accurately control the number of output files, `spark.sql.files.maxRecordsPerFile` also controls it. ## How was this patch tested? N/A Author: Wenchen Fan <wenc...@databricks.com> Closes #20081 from cloud-fan/minor. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9348e684 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9348e684 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9348e684 Branch: refs/heads/master Commit: 9348e684208465a8f75c893bdeaa30fc42c0cb5f Parents: ff48b1b Author: Wenchen Fan <wenc...@databricks.com> Authored: Tue Dec 26 09:37:39 2017 -0800 Committer: gatorsmile <gatorsm...@gmail.com> Committed: Tue Dec 26 09:37:39 2017 -0800 ---------------------------------------------------------------------- .../examples/sql/hive/SparkHiveExample.scala | 75 ++++++++++++-------- .../org/apache/spark/sql/internal/SQLConf.scala | 4 +- 2 files changed, 46 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/9348e684/examples/src/main/scala/org/apache/spark/examples/sql/hive/SparkHiveExample.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/spark/examples/sql/hive/SparkHiveExample.scala b/examples/src/main/scala/org/apache/spark/examples/sql/hive/SparkHiveExample.scala index b193bd5..70fb5b2 100644 --- a/examples/src/main/scala/org/apache/spark/examples/sql/hive/SparkHiveExample.scala +++ b/examples/src/main/scala/org/apache/spark/examples/sql/hive/SparkHiveExample.scala @@ -102,40 +102,53 @@ object SparkHiveExample { // | 5| val_5| 5| val_5| // ... - // Create Hive managed table with Parquet - sql("CREATE TABLE records(key int, value string) STORED AS PARQUET") - // Save DataFrame to Hive managed table as Parquet format - val hiveTableDF = sql("SELECT * FROM records") - hiveTableDF.write.mode(SaveMode.Overwrite).saveAsTable("database_name.records") - // Create External Hive table with Parquet - sql("CREATE EXTERNAL TABLE records(key int, value string) " + - "STORED AS PARQUET LOCATION '/user/hive/warehouse/'") - // to make Hive Parquet format compatible with Spark Parquet format - spark.sqlContext.setConf("spark.sql.parquet.writeLegacyFormat", "true") - - // Multiple Parquet files could be created accordingly to volume of data under directory given. - val hiveExternalTableLocation = "/user/hive/warehouse/database_name.db/records" - - // Save DataFrame to Hive External table as compatible Parquet format - hiveTableDF.write.mode(SaveMode.Overwrite).parquet(hiveExternalTableLocation) - - // Turn on flag for Dynamic Partitioning - spark.sqlContext.setConf("hive.exec.dynamic.partition", "true") - spark.sqlContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict") - - // You can create partitions in Hive table, so downstream queries run much faster. - hiveTableDF.write.mode(SaveMode.Overwrite).partitionBy("key") - .parquet(hiveExternalTableLocation) + // Create a Hive managed Parquet table, with HQL syntax instead of the Spark SQL native syntax + // `USING hive` + sql("CREATE TABLE hive_records(key int, value string) STORED AS PARQUET") + // Save DataFrame to the Hive managed table + val df = spark.table("src") + df.write.mode(SaveMode.Overwrite).saveAsTable("hive_records") + // After insertion, the Hive managed table has data now + sql("SELECT * FROM hive_records").show() + // +---+-------+ + // |key| value| + // +---+-------+ + // |238|val_238| + // | 86| val_86| + // |311|val_311| + // ... - // Reduce number of files for each partition by repartition - hiveTableDF.repartition($"key").write.mode(SaveMode.Overwrite) - .partitionBy("key").parquet(hiveExternalTableLocation) + // Prepare a Parquet data directory + val dataDir = "/tmp/parquet_data" + spark.range(10).write.parquet(dataDir) + // Create a Hive external Parquet table + sql(s"CREATE EXTERNAL TABLE hive_ints(key int) STORED AS PARQUET LOCATION '$dataDir'") + // The Hive external table should already have data + sql("SELECT * FROM hive_ints").show() + // +---+ + // |key| + // +---+ + // | 0| + // | 1| + // | 2| + // ... - // Control the number of files in each partition by coalesce - hiveTableDF.coalesce(10).write.mode(SaveMode.Overwrite) - .partitionBy("key").parquet(hiveExternalTableLocation) - // $example off:spark_hive$ + // Turn on flag for Hive Dynamic Partitioning + spark.sqlContext.setConf("hive.exec.dynamic.partition", "true") + spark.sqlContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict") + // Create a Hive partitioned table using DataFrame API + df.write.partitionBy("key").format("hive").saveAsTable("hive_part_tbl") + // Partitioned column `key` will be moved to the end of the schema. + sql("SELECT * FROM hive_part_tbl").show() + // +-------+---+ + // | value|key| + // +-------+---+ + // |val_238|238| + // | val_86| 86| + // |val_311|311| + // ... spark.stop() + // $example off:spark_hive$ } } http://git-wip-us.apache.org/repos/asf/spark/blob/9348e684/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 84fe4bb..f16972e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -336,8 +336,8 @@ object SQLConf { .createWithDefault(true) val PARQUET_WRITE_LEGACY_FORMAT = buildConf("spark.sql.parquet.writeLegacyFormat") - .doc("Whether to follow Parquet's format specification when converting Parquet schema to " + - "Spark SQL schema and vice versa.") + .doc("Whether to be compatible with the legacy Parquet format adopted by Spark 1.4 and prior " + + "versions, when converting Parquet schema to Spark SQL schema and vice versa.") .booleanConf .createWithDefault(false) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org