This is an automated email from the ASF dual-hosted git repository.

tdas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 26ed65f  [SPARK-27453] Pass partitionBy as options in DataFrameWriter
26ed65f is described below

commit 26ed65f4150db1fa37f8bfab24ac0873d2e42936
Author: liwensun <liwen....@databricks.com>
AuthorDate: Tue Apr 16 15:03:16 2019 -0700

    [SPARK-27453] Pass partitionBy as options in DataFrameWriter
    
    ## What changes were proposed in this pull request?
    
    Pass partitionBy columns as options and feature-flag this behavior.
    
    ## How was this patch tested?
    
    A new unit test.
    
    Closes #24365 from liwensun/partitionby.
    
    Authored-by: liwensun <liwen....@databricks.com>
    Signed-off-by: Tathagata Das <tathagata.das1...@gmail.com>
---
 .../org/apache/spark/sql/internal/SQLConf.scala      |  9 +++++++++
 .../scala/org/apache/spark/sql/DataFrameWriter.scala | 11 ++++++++++-
 .../sql/execution/datasources/DataSourceUtils.scala  | 20 ++++++++++++++++++++
 .../spark/sql/test/DataFrameReaderWriterSuite.scala  | 19 +++++++++++++++++++
 4 files changed, 58 insertions(+), 1 deletion(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 3f59fa1..b223a48 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -1687,6 +1687,15 @@ object SQLConf {
       .booleanConf
       .createWithDefault(false)
 
+  val LEGACY_PASS_PARTITION_BY_AS_OPTIONS =
+    buildConf("spark.sql.legacy.sources.write.passPartitionByAsOptions")
+      .internal()
+      .doc("Whether to pass the partitionBy columns as options in 
DataFrameWriter. " +
+        "Data source V1 now silently drops partitionBy columns for 
non-file-format sources; " +
+        "turning the flag on provides a way for these sources to see these 
partitionBy columns.")
+      .booleanConf
+      .createWithDefault(false)
+
   val NAME_NON_STRUCT_GROUPING_KEY_AS_VALUE =
     buildConf("spark.sql.legacy.dataset.nameNonStructGroupingKeyAsValue")
       .internal()
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 9371936..3b84151 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -29,8 +29,9 @@ import org.apache.spark.sql.catalyst.expressions.Literal
 import org.apache.spark.sql.catalyst.plans.logical.{AppendData, 
InsertIntoTable, LogicalPlan, OverwriteByExpression}
 import org.apache.spark.sql.execution.SQLExecution
 import org.apache.spark.sql.execution.command.DDLUtils
-import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, 
LogicalRelation}
+import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, 
DataSourceUtils, LogicalRelation}
 import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, 
DataSourceV2Utils, FileDataSourceV2, WriteToDataSourceV2}
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources.BaseRelation
 import org.apache.spark.sql.sources.v2._
 import org.apache.spark.sql.sources.v2.TableCapability._
@@ -313,6 +314,14 @@ final class DataFrameWriter[T] private[sql](ds: 
Dataset[T]) {
   }
 
   private def saveToV1Source(): Unit = {
+    if (SparkSession.active.sessionState.conf.getConf(
+      SQLConf.LEGACY_PASS_PARTITION_BY_AS_OPTIONS)) {
+      partitioningColumns.foreach { columns =>
+        extraOptions += (DataSourceUtils.PARTITIONING_COLUMNS_KEY ->
+          DataSourceUtils.encodePartitioningColumns(columns))
+      }
+    }
+
     // Code path for data source v1.
     runCommand(df.sparkSession, "save") {
       DataSource(
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
index 74eae94..0ad914e 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
@@ -18,6 +18,8 @@
 package org.apache.spark.sql.execution.datasources
 
 import org.apache.hadoop.fs.Path
+import org.json4s.NoTypeHints
+import org.json4s.jackson.Serialization
 
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.types._
@@ -25,6 +27,24 @@ import org.apache.spark.sql.types._
 
 object DataSourceUtils {
   /**
+   * The key to use for storing partitionBy columns as options.
+   */
+  val PARTITIONING_COLUMNS_KEY = "__partition_columns"
+
+  /**
+   * Utility methods for converting partitionBy columns to options and back.
+   */
+  private implicit val formats = Serialization.formats(NoTypeHints)
+
+  def encodePartitioningColumns(columns: Seq[String]): String = {
+    Serialization.write(columns)
+  }
+
+  def decodePartitioningColumns(str: String): Seq[String] = {
+    Serialization.read[Seq[String]](str)
+  }
+
+  /**
    * Verify if the schema is supported in datasource. This verification should 
be done
    * in a driver side.
    */
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
index 9f96947..d34da33 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
@@ -38,6 +38,7 @@ import 
org.apache.spark.internal.io.HadoopMapReduceCommitProtocol
 import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart}
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.execution.datasources.DataSourceUtils
 import 
org.apache.spark.sql.execution.datasources.parquet.SpecificParquetRecordReaderBase
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources._
@@ -219,6 +220,24 @@ class DataFrameReaderWriterSuite extends QueryTest with 
SharedSQLContext with Be
     assert(LastOptions.parameters("opt3") == "3")
   }
 
+  test("pass partitionBy as options") {
+    Seq(true, false).foreach { flag =>
+      withSQLConf(SQLConf.LEGACY_PASS_PARTITION_BY_AS_OPTIONS.key -> s"$flag") 
{
+        Seq(1).toDF.write
+          .format("org.apache.spark.sql.test")
+          .partitionBy("col1", "col2")
+          .save()
+
+        if (flag) {
+          val partColumns = 
LastOptions.parameters(DataSourceUtils.PARTITIONING_COLUMNS_KEY)
+          assert(DataSourceUtils.decodePartitioningColumns(partColumns) === 
Seq("col1", "col2"))
+        } else {
+          
assert(!LastOptions.parameters.contains(DataSourceUtils.PARTITIONING_COLUMNS_KEY))
+        }
+      }
+    }
+  }
+
   test("save mode") {
     val df = spark.read
       .format("org.apache.spark.sql.test")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to