This is an automated email from the ASF dual-hosted git repository.

lixiao pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new ebc358c  [SPARK-31086][SQL] Add Back the Deprecated SQLContext methods
ebc358c is described below

commit ebc358c8d2b6d67c7319be006452c9c993b7a098
Author: gatorsmile <gatorsm...@gmail.com>
AuthorDate: Thu Mar 26 23:49:24 2020 -0700

    [SPARK-31086][SQL] Add Back the Deprecated SQLContext methods
    
    ### What changes were proposed in this pull request?
    
    Based on the discussion in the mailing list [[Proposal] Modification to 
Spark's Semantic Versioning 
Policy](http://apache-spark-developers-list.1001551.n3.nabble.com/Proposal-Modification-to-Spark-s-Semantic-Versioning-Policy-td28938.html)
 , this PR is to add back the following APIs whose maintenance cost are 
relatively small.
    
    - SQLContext.applySchema
    - SQLContext.parquetFile
    - SQLContext.jsonFile
    - SQLContext.jsonRDD
    - SQLContext.load
    - SQLContext.jdbc
    
    ### Why are the changes needed?
    Avoid breaking the APIs that are commonly used.
    
    ### Does this PR introduce any user-facing change?
    Adding back the APIs that were removed in 3.0 branch does not introduce the 
user-facing changes, because Spark 3.0 has not been released.
    
    ### How was this patch tested?
    The existing tests.
    
    Closes #27839 from gatorsmile/addAPIBackV3.
    
    Lead-authored-by: gatorsmile <gatorsm...@gmail.com>
    Co-authored-by: yi.wu <yi...@databricks.com>
    Signed-off-by: gatorsmile <gatorsm...@gmail.com>
    (cherry picked from commit b7e4cc775b7eac68606d1f385911613f5139db1b)
    Signed-off-by: gatorsmile <gatorsm...@gmail.com>
---
 .../scala/org/apache/spark/sql/SQLContext.scala    | 283 +++++++++++++++++++++
 .../org/apache/spark/sql/DeprecatedAPISuite.scala  | 106 ++++++++
 .../org/apache/spark/sql/jdbc/JDBCSuite.scala      |  14 +
 3 files changed, 403 insertions(+)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 2054874..592c64c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -611,6 +611,289 @@ class SQLContext private[sql](val sparkSession: 
SparkSession)
     sessionState.catalog.listTables(databaseName).map(_.table).toArray
   }
 
+  ////////////////////////////////////////////////////////////////////////////
+  ////////////////////////////////////////////////////////////////////////////
+  // Deprecated methods
+  ////////////////////////////////////////////////////////////////////////////
+  ////////////////////////////////////////////////////////////////////////////
+
+  /**
+   * @deprecated As of 1.3.0, replaced by `createDataFrame()`.
+   */
+  @deprecated("Use createDataFrame instead.", "1.3.0")
+  def applySchema(rowRDD: RDD[Row], schema: StructType): DataFrame = {
+    createDataFrame(rowRDD, schema)
+  }
+
+  /**
+   * @deprecated As of 1.3.0, replaced by `createDataFrame()`.
+   */
+  @deprecated("Use createDataFrame instead.", "1.3.0")
+  def applySchema(rowRDD: JavaRDD[Row], schema: StructType): DataFrame = {
+    createDataFrame(rowRDD, schema)
+  }
+
+  /**
+   * @deprecated As of 1.3.0, replaced by `createDataFrame()`.
+   */
+  @deprecated("Use createDataFrame instead.", "1.3.0")
+  def applySchema(rdd: RDD[_], beanClass: Class[_]): DataFrame = {
+    createDataFrame(rdd, beanClass)
+  }
+
+  /**
+   * @deprecated As of 1.3.0, replaced by `createDataFrame()`.
+   */
+  @deprecated("Use createDataFrame instead.", "1.3.0")
+  def applySchema(rdd: JavaRDD[_], beanClass: Class[_]): DataFrame = {
+    createDataFrame(rdd, beanClass)
+  }
+
+  /**
+   * Loads a Parquet file, returning the result as a `DataFrame`. This 
function returns an empty
+   * `DataFrame` if no paths are passed in.
+   *
+   * @group specificdata
+   * @deprecated As of 1.4.0, replaced by `read().parquet()`.
+   */
+  @deprecated("Use read.parquet() instead.", "1.4.0")
+  @scala.annotation.varargs
+  def parquetFile(paths: String*): DataFrame = {
+    if (paths.isEmpty) {
+      emptyDataFrame
+    } else {
+      read.parquet(paths : _*)
+    }
+  }
+
+  /**
+   * Loads a JSON file (one object per line), returning the result as a 
`DataFrame`.
+   * It goes through the entire dataset once to determine the schema.
+   *
+   * @group specificdata
+   * @deprecated As of 1.4.0, replaced by `read().json()`.
+   */
+  @deprecated("Use read.json() instead.", "1.4.0")
+  def jsonFile(path: String): DataFrame = {
+    read.json(path)
+  }
+
+  /**
+   * Loads a JSON file (one object per line) and applies the given schema,
+   * returning the result as a `DataFrame`.
+   *
+   * @group specificdata
+   * @deprecated As of 1.4.0, replaced by `read().json()`.
+   */
+  @deprecated("Use read.json() instead.", "1.4.0")
+  def jsonFile(path: String, schema: StructType): DataFrame = {
+    read.schema(schema).json(path)
+  }
+
+  /**
+   * @group specificdata
+   * @deprecated As of 1.4.0, replaced by `read().json()`.
+   */
+  @deprecated("Use read.json() instead.", "1.4.0")
+  def jsonFile(path: String, samplingRatio: Double): DataFrame = {
+    read.option("samplingRatio", samplingRatio.toString).json(path)
+  }
+
+  /**
+   * Loads an RDD[String] storing JSON objects (one object per record), 
returning the result as a
+   * `DataFrame`.
+   * It goes through the entire dataset once to determine the schema.
+   *
+   * @group specificdata
+   * @deprecated As of 1.4.0, replaced by `read().json()`.
+   */
+  @deprecated("Use read.json() instead.", "1.4.0")
+  def jsonRDD(json: RDD[String]): DataFrame = read.json(json)
+
+  /**
+   * Loads an RDD[String] storing JSON objects (one object per record), 
returning the result as a
+   * `DataFrame`.
+   * It goes through the entire dataset once to determine the schema.
+   *
+   * @group specificdata
+   * @deprecated As of 1.4.0, replaced by `read().json()`.
+   */
+  @deprecated("Use read.json() instead.", "1.4.0")
+  def jsonRDD(json: JavaRDD[String]): DataFrame = read.json(json)
+
+  /**
+   * Loads an RDD[String] storing JSON objects (one object per record) and 
applies the given schema,
+   * returning the result as a `DataFrame`.
+   *
+   * @group specificdata
+   * @deprecated As of 1.4.0, replaced by `read().json()`.
+   */
+  @deprecated("Use read.json() instead.", "1.4.0")
+  def jsonRDD(json: RDD[String], schema: StructType): DataFrame = {
+    read.schema(schema).json(json)
+  }
+
+  /**
+   * Loads an JavaRDD[String] storing JSON objects (one object per record) and 
applies the given
+   * schema, returning the result as a `DataFrame`.
+   *
+   * @group specificdata
+   * @deprecated As of 1.4.0, replaced by `read().json()`.
+   */
+  @deprecated("Use read.json() instead.", "1.4.0")
+  def jsonRDD(json: JavaRDD[String], schema: StructType): DataFrame = {
+    read.schema(schema).json(json)
+  }
+
+  /**
+   * Loads an RDD[String] storing JSON objects (one object per record) 
inferring the
+   * schema, returning the result as a `DataFrame`.
+   *
+   * @group specificdata
+   * @deprecated As of 1.4.0, replaced by `read().json()`.
+   */
+  @deprecated("Use read.json() instead.", "1.4.0")
+  def jsonRDD(json: RDD[String], samplingRatio: Double): DataFrame = {
+    read.option("samplingRatio", samplingRatio.toString).json(json)
+  }
+
+  /**
+   * Loads a JavaRDD[String] storing JSON objects (one object per record) 
inferring the
+   * schema, returning the result as a `DataFrame`.
+   *
+   * @group specificdata
+   * @deprecated As of 1.4.0, replaced by `read().json()`.
+   */
+  @deprecated("Use read.json() instead.", "1.4.0")
+  def jsonRDD(json: JavaRDD[String], samplingRatio: Double): DataFrame = {
+    read.option("samplingRatio", samplingRatio.toString).json(json)
+  }
+
+  /**
+   * Returns the dataset stored at path as a DataFrame,
+   * using the default data source configured by spark.sql.sources.default.
+   *
+   * @group genericdata
+   * @deprecated As of 1.4.0, replaced by `read().load(path)`.
+   */
+  @deprecated("Use read.load(path) instead.", "1.4.0")
+  def load(path: String): DataFrame = {
+    read.load(path)
+  }
+
+  /**
+   * Returns the dataset stored at path as a DataFrame, using the given data 
source.
+   *
+   * @group genericdata
+   * @deprecated As of 1.4.0, replaced by `read().format(source).load(path)`.
+   */
+  @deprecated("Use read.format(source).load(path) instead.", "1.4.0")
+  def load(path: String, source: String): DataFrame = {
+    read.format(source).load(path)
+  }
+
+  /**
+   * (Java-specific) Returns the dataset specified by the given data source and
+   * a set of options as a DataFrame.
+   *
+   * @group genericdata
+   * @deprecated As of 1.4.0, replaced by 
`read().format(source).options(options).load()`.
+   */
+  @deprecated("Use read.format(source).options(options).load() instead.", 
"1.4.0")
+  def load(source: String, options: java.util.Map[String, String]): DataFrame 
= {
+    read.options(options).format(source).load()
+  }
+
+  /**
+   * (Scala-specific) Returns the dataset specified by the given data source 
and
+   * a set of options as a DataFrame.
+   *
+   * @group genericdata
+   * @deprecated As of 1.4.0, replaced by 
`read().format(source).options(options).load()`.
+   */
+  @deprecated("Use read.format(source).options(options).load() instead.", 
"1.4.0")
+  def load(source: String, options: Map[String, String]): DataFrame = {
+    read.options(options).format(source).load()
+  }
+
+  /**
+   * (Java-specific) Returns the dataset specified by the given data source and
+   * a set of options as a DataFrame, using the given schema as the schema of 
the DataFrame.
+   *
+   * @group genericdata
+   * @deprecated As of 1.4.0, replaced by
+   *            `read().format(source).schema(schema).options(options).load()`.
+   */
+  @deprecated("Use read.format(source).schema(schema).options(options).load() 
instead.", "1.4.0")
+  def load(
+      source: String,
+      schema: StructType,
+      options: java.util.Map[String, String]): DataFrame = {
+    read.format(source).schema(schema).options(options).load()
+  }
+
+  /**
+   * (Scala-specific) Returns the dataset specified by the given data source 
and
+   * a set of options as a DataFrame, using the given schema as the schema of 
the DataFrame.
+   *
+   * @group genericdata
+   * @deprecated As of 1.4.0, replaced by
+   *            `read().format(source).schema(schema).options(options).load()`.
+   */
+  @deprecated("Use read.format(source).schema(schema).options(options).load() 
instead.", "1.4.0")
+  def load(source: String, schema: StructType, options: Map[String, String]): 
DataFrame = {
+    read.format(source).schema(schema).options(options).load()
+  }
+
+  /**
+   * Construct a `DataFrame` representing the database table accessible via 
JDBC URL
+   * url named table.
+   *
+   * @group specificdata
+   * @deprecated As of 1.4.0, replaced by `read().jdbc()`.
+   */
+  @deprecated("Use read.jdbc() instead.", "1.4.0")
+  def jdbc(url: String, table: String): DataFrame = {
+    read.jdbc(url, table, new Properties)
+  }
+
+  /**
+   * Construct a `DataFrame` representing the database table accessible via 
JDBC URL
+   * url named table.  Partitions of the table will be retrieved in parallel 
based on the parameters
+   * passed to this function.
+   *
+   * @param columnName the name of a column of integral type that will be used 
for partitioning.
+   * @param lowerBound the minimum value of `columnName` used to decide 
partition stride
+   * @param upperBound the maximum value of `columnName` used to decide 
partition stride
+   * @param numPartitions the number of partitions.  the range 
`minValue`-`maxValue` will be split
+   *                      evenly into this many partitions
+   * @group specificdata
+   * @deprecated As of 1.4.0, replaced by `read().jdbc()`.
+   */
+  @deprecated("Use read.jdbc() instead.", "1.4.0")
+  def jdbc(
+      url: String,
+      table: String,
+      columnName: String,
+      lowerBound: Long,
+      upperBound: Long,
+      numPartitions: Int): DataFrame = {
+    read.jdbc(url, table, columnName, lowerBound, upperBound, numPartitions, 
new Properties)
+  }
+
+  /**
+   * Construct a `DataFrame` representing the database table accessible via 
JDBC URL
+   * url named table. The theParts parameter gives a list expressions
+   * suitable for inclusion in WHERE clauses; each one defines one partition
+   * of the `DataFrame`.
+   *
+   * @group specificdata
+   * @deprecated As of 1.4.0, replaced by `read().jdbc()`.
+   */
+  @deprecated("Use read.jdbc() instead.", "1.4.0")
+  def jdbc(url: String, table: String, theParts: Array[String]): DataFrame = {
+    read.jdbc(url, table, theParts, new Properties)
+  }
 }
 
 /**
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DeprecatedAPISuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DeprecatedAPISuite.scala
new file mode 100644
index 0000000..c31ef99
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DeprecatedAPISuite.scala
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.types.{IntegerType, StringType, StructField, 
StructType}
+
+class DeprecatedAPISuite extends QueryTest with SharedSparkSession {
+
+  test("SQLContext.applySchema") {
+    val rowRdd = sparkContext.parallelize(Seq(Row("Jack", 20), Row("Marry", 
18)))
+    val schema = StructType(StructField("name", StringType, false) ::
+      StructField("age", IntegerType, true) :: Nil)
+    val sqlContext = spark.sqlContext
+    checkAnswer(sqlContext.applySchema(rowRdd, schema), Row("Jack", 20) :: 
Row("Marry", 18) :: Nil)
+    checkAnswer(sqlContext.applySchema(rowRdd.toJavaRDD(), schema),
+      Row("Jack", 20) :: Row("Marry", 18) :: Nil)
+  }
+
+  test("SQLContext.parquetFile") {
+    val sqlContext = spark.sqlContext
+    withTempDir { dir =>
+      val parquetFile = s"${dir.toString}/${System.currentTimeMillis()}"
+      val expectDF = spark.range(10).toDF()
+      expectDF.write.parquet(parquetFile)
+      val parquetDF = sqlContext.parquetFile(parquetFile)
+      checkAnswer(parquetDF, expectDF)
+    }
+  }
+
+  test("SQLContext.jsonFile") {
+    val sqlContext = spark.sqlContext
+    withTempDir { dir =>
+      val jsonFile = s"${dir.toString}/${System.currentTimeMillis()}"
+      val expectDF = spark.range(10).toDF()
+      expectDF.write.json(jsonFile)
+      var jsonDF = sqlContext.jsonFile(jsonFile)
+      checkAnswer(jsonDF, expectDF)
+      assert(jsonDF.schema === expectDF.schema.asNullable)
+
+      var schema = expectDF.schema
+      jsonDF = sqlContext.jsonFile(jsonFile, schema)
+      checkAnswer(jsonDF, expectDF)
+      assert(jsonDF.schema === schema.asNullable)
+
+      jsonDF = sqlContext.jsonFile(jsonFile, 0.9)
+      checkAnswer(jsonDF, expectDF)
+
+      val jsonRDD = 
sparkContext.parallelize(Seq("{\"name\":\"Jack\",\"age\":20}",
+        "{\"name\":\"Marry\",\"age\":18}"))
+      jsonDF = sqlContext.jsonRDD(jsonRDD)
+      checkAnswer(jsonDF, Row(18, "Marry") :: Row(20, "Jack") :: Nil)
+      jsonDF = sqlContext.jsonRDD(jsonRDD.toJavaRDD())
+      checkAnswer(jsonDF, Row(18, "Marry") :: Row(20, "Jack") :: Nil)
+
+      schema = StructType(StructField("name", StringType, false) ::
+        StructField("age", IntegerType, false) :: Nil)
+      jsonDF = sqlContext.jsonRDD(jsonRDD, schema)
+      checkAnswer(jsonDF, Row("Jack", 20) :: Row("Marry", 18) :: Nil)
+      jsonDF = sqlContext.jsonRDD(jsonRDD.toJavaRDD(), schema)
+      checkAnswer(jsonDF, Row("Jack", 20) :: Row("Marry", 18) :: Nil)
+
+
+      jsonDF = sqlContext.jsonRDD(jsonRDD, 0.9)
+      checkAnswer(jsonDF, Row(18, "Marry") :: Row(20, "Jack") :: Nil)
+      jsonDF = sqlContext.jsonRDD(jsonRDD.toJavaRDD(), 0.9)
+      checkAnswer(jsonDF, Row(18, "Marry") :: Row(20, "Jack") :: Nil)
+    }
+  }
+
+  test("SQLContext.load") {
+    withTempDir { dir =>
+      val path = s"${dir.toString}/${System.currentTimeMillis()}"
+      val expectDF = spark.range(10).toDF()
+      expectDF.write.parquet(path)
+      val sqlContext = spark.sqlContext
+
+      var loadDF = sqlContext.load(path)
+      checkAnswer(loadDF, expectDF)
+
+      loadDF = sqlContext.load(path, "parquet")
+      checkAnswer(loadDF, expectDF)
+
+      loadDF = sqlContext.load("parquet", Map("path" -> path))
+      checkAnswer(loadDF, expectDF)
+
+      loadDF = sqlContext.load("parquet", expectDF.schema, Map("path" -> path))
+      checkAnswer(loadDF, expectDF)
+    }
+  }
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
index 9cba95f..fd691f5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
@@ -1699,4 +1699,18 @@ class JDBCSuite extends QueryTest
     assert(JdbcDialects.get("jdbc:teradata://localhost/db") === 
TeradataDialect)
     assert(JdbcDialects.get("jdbc:Teradata://localhost/db") === 
TeradataDialect)
   }
+
+  test("SQLContext.jdbc (deprecated)") {
+    val sqlContext = spark.sqlContext
+    var jdbcDF = sqlContext.jdbc(urlWithUserAndPass, "TEST.PEOPLE")
+    checkAnswer(jdbcDF, Row("fred", 1) :: Row("mary", 2) :: Row ("joe 'foo' 
\"bar\"", 3) :: Nil)
+
+    jdbcDF = sqlContext.jdbc(urlWithUserAndPass, "TEST.PEOPLE", "THEID", 0, 4, 
3)
+    checkNumPartitions(jdbcDF, 3)
+    checkAnswer(jdbcDF, Row("fred", 1) :: Row("mary", 2) :: Row ("joe 'foo' 
\"bar\"", 3) :: Nil)
+
+    val parts = Array[String]("THEID = 2")
+    jdbcDF = sqlContext.jdbc(urlWithUserAndPass, "TEST.PEOPLE", parts)
+    checkAnswer(jdbcDF, Row("mary", 2) :: Nil)
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to