This is an automated email from the ASF dual-hosted git repository. lixiao pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new ebc358c [SPARK-31086][SQL] Add Back the Deprecated SQLContext methods ebc358c is described below commit ebc358c8d2b6d67c7319be006452c9c993b7a098 Author: gatorsmile <gatorsm...@gmail.com> AuthorDate: Thu Mar 26 23:49:24 2020 -0700 [SPARK-31086][SQL] Add Back the Deprecated SQLContext methods ### What changes were proposed in this pull request? Based on the discussion in the mailing list [[Proposal] Modification to Spark's Semantic Versioning Policy](http://apache-spark-developers-list.1001551.n3.nabble.com/Proposal-Modification-to-Spark-s-Semantic-Versioning-Policy-td28938.html) , this PR is to add back the following APIs whose maintenance cost are relatively small. - SQLContext.applySchema - SQLContext.parquetFile - SQLContext.jsonFile - SQLContext.jsonRDD - SQLContext.load - SQLContext.jdbc ### Why are the changes needed? Avoid breaking the APIs that are commonly used. ### Does this PR introduce any user-facing change? Adding back the APIs that were removed in 3.0 branch does not introduce the user-facing changes, because Spark 3.0 has not been released. ### How was this patch tested? The existing tests. Closes #27839 from gatorsmile/addAPIBackV3. Lead-authored-by: gatorsmile <gatorsm...@gmail.com> Co-authored-by: yi.wu <yi...@databricks.com> Signed-off-by: gatorsmile <gatorsm...@gmail.com> (cherry picked from commit b7e4cc775b7eac68606d1f385911613f5139db1b) Signed-off-by: gatorsmile <gatorsm...@gmail.com> --- .../scala/org/apache/spark/sql/SQLContext.scala | 283 +++++++++++++++++++++ .../org/apache/spark/sql/DeprecatedAPISuite.scala | 106 ++++++++ .../org/apache/spark/sql/jdbc/JDBCSuite.scala | 14 + 3 files changed, 403 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 2054874..592c64c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -611,6 +611,289 @@ class SQLContext private[sql](val sparkSession: SparkSession) sessionState.catalog.listTables(databaseName).map(_.table).toArray } + //////////////////////////////////////////////////////////////////////////// + //////////////////////////////////////////////////////////////////////////// + // Deprecated methods + //////////////////////////////////////////////////////////////////////////// + //////////////////////////////////////////////////////////////////////////// + + /** + * @deprecated As of 1.3.0, replaced by `createDataFrame()`. + */ + @deprecated("Use createDataFrame instead.", "1.3.0") + def applySchema(rowRDD: RDD[Row], schema: StructType): DataFrame = { + createDataFrame(rowRDD, schema) + } + + /** + * @deprecated As of 1.3.0, replaced by `createDataFrame()`. + */ + @deprecated("Use createDataFrame instead.", "1.3.0") + def applySchema(rowRDD: JavaRDD[Row], schema: StructType): DataFrame = { + createDataFrame(rowRDD, schema) + } + + /** + * @deprecated As of 1.3.0, replaced by `createDataFrame()`. + */ + @deprecated("Use createDataFrame instead.", "1.3.0") + def applySchema(rdd: RDD[_], beanClass: Class[_]): DataFrame = { + createDataFrame(rdd, beanClass) + } + + /** + * @deprecated As of 1.3.0, replaced by `createDataFrame()`. + */ + @deprecated("Use createDataFrame instead.", "1.3.0") + def applySchema(rdd: JavaRDD[_], beanClass: Class[_]): DataFrame = { + createDataFrame(rdd, beanClass) + } + + /** + * Loads a Parquet file, returning the result as a `DataFrame`. This function returns an empty + * `DataFrame` if no paths are passed in. + * + * @group specificdata + * @deprecated As of 1.4.0, replaced by `read().parquet()`. + */ + @deprecated("Use read.parquet() instead.", "1.4.0") + @scala.annotation.varargs + def parquetFile(paths: String*): DataFrame = { + if (paths.isEmpty) { + emptyDataFrame + } else { + read.parquet(paths : _*) + } + } + + /** + * Loads a JSON file (one object per line), returning the result as a `DataFrame`. + * It goes through the entire dataset once to determine the schema. + * + * @group specificdata + * @deprecated As of 1.4.0, replaced by `read().json()`. + */ + @deprecated("Use read.json() instead.", "1.4.0") + def jsonFile(path: String): DataFrame = { + read.json(path) + } + + /** + * Loads a JSON file (one object per line) and applies the given schema, + * returning the result as a `DataFrame`. + * + * @group specificdata + * @deprecated As of 1.4.0, replaced by `read().json()`. + */ + @deprecated("Use read.json() instead.", "1.4.0") + def jsonFile(path: String, schema: StructType): DataFrame = { + read.schema(schema).json(path) + } + + /** + * @group specificdata + * @deprecated As of 1.4.0, replaced by `read().json()`. + */ + @deprecated("Use read.json() instead.", "1.4.0") + def jsonFile(path: String, samplingRatio: Double): DataFrame = { + read.option("samplingRatio", samplingRatio.toString).json(path) + } + + /** + * Loads an RDD[String] storing JSON objects (one object per record), returning the result as a + * `DataFrame`. + * It goes through the entire dataset once to determine the schema. + * + * @group specificdata + * @deprecated As of 1.4.0, replaced by `read().json()`. + */ + @deprecated("Use read.json() instead.", "1.4.0") + def jsonRDD(json: RDD[String]): DataFrame = read.json(json) + + /** + * Loads an RDD[String] storing JSON objects (one object per record), returning the result as a + * `DataFrame`. + * It goes through the entire dataset once to determine the schema. + * + * @group specificdata + * @deprecated As of 1.4.0, replaced by `read().json()`. + */ + @deprecated("Use read.json() instead.", "1.4.0") + def jsonRDD(json: JavaRDD[String]): DataFrame = read.json(json) + + /** + * Loads an RDD[String] storing JSON objects (one object per record) and applies the given schema, + * returning the result as a `DataFrame`. + * + * @group specificdata + * @deprecated As of 1.4.0, replaced by `read().json()`. + */ + @deprecated("Use read.json() instead.", "1.4.0") + def jsonRDD(json: RDD[String], schema: StructType): DataFrame = { + read.schema(schema).json(json) + } + + /** + * Loads an JavaRDD[String] storing JSON objects (one object per record) and applies the given + * schema, returning the result as a `DataFrame`. + * + * @group specificdata + * @deprecated As of 1.4.0, replaced by `read().json()`. + */ + @deprecated("Use read.json() instead.", "1.4.0") + def jsonRDD(json: JavaRDD[String], schema: StructType): DataFrame = { + read.schema(schema).json(json) + } + + /** + * Loads an RDD[String] storing JSON objects (one object per record) inferring the + * schema, returning the result as a `DataFrame`. + * + * @group specificdata + * @deprecated As of 1.4.0, replaced by `read().json()`. + */ + @deprecated("Use read.json() instead.", "1.4.0") + def jsonRDD(json: RDD[String], samplingRatio: Double): DataFrame = { + read.option("samplingRatio", samplingRatio.toString).json(json) + } + + /** + * Loads a JavaRDD[String] storing JSON objects (one object per record) inferring the + * schema, returning the result as a `DataFrame`. + * + * @group specificdata + * @deprecated As of 1.4.0, replaced by `read().json()`. + */ + @deprecated("Use read.json() instead.", "1.4.0") + def jsonRDD(json: JavaRDD[String], samplingRatio: Double): DataFrame = { + read.option("samplingRatio", samplingRatio.toString).json(json) + } + + /** + * Returns the dataset stored at path as a DataFrame, + * using the default data source configured by spark.sql.sources.default. + * + * @group genericdata + * @deprecated As of 1.4.0, replaced by `read().load(path)`. + */ + @deprecated("Use read.load(path) instead.", "1.4.0") + def load(path: String): DataFrame = { + read.load(path) + } + + /** + * Returns the dataset stored at path as a DataFrame, using the given data source. + * + * @group genericdata + * @deprecated As of 1.4.0, replaced by `read().format(source).load(path)`. + */ + @deprecated("Use read.format(source).load(path) instead.", "1.4.0") + def load(path: String, source: String): DataFrame = { + read.format(source).load(path) + } + + /** + * (Java-specific) Returns the dataset specified by the given data source and + * a set of options as a DataFrame. + * + * @group genericdata + * @deprecated As of 1.4.0, replaced by `read().format(source).options(options).load()`. + */ + @deprecated("Use read.format(source).options(options).load() instead.", "1.4.0") + def load(source: String, options: java.util.Map[String, String]): DataFrame = { + read.options(options).format(source).load() + } + + /** + * (Scala-specific) Returns the dataset specified by the given data source and + * a set of options as a DataFrame. + * + * @group genericdata + * @deprecated As of 1.4.0, replaced by `read().format(source).options(options).load()`. + */ + @deprecated("Use read.format(source).options(options).load() instead.", "1.4.0") + def load(source: String, options: Map[String, String]): DataFrame = { + read.options(options).format(source).load() + } + + /** + * (Java-specific) Returns the dataset specified by the given data source and + * a set of options as a DataFrame, using the given schema as the schema of the DataFrame. + * + * @group genericdata + * @deprecated As of 1.4.0, replaced by + * `read().format(source).schema(schema).options(options).load()`. + */ + @deprecated("Use read.format(source).schema(schema).options(options).load() instead.", "1.4.0") + def load( + source: String, + schema: StructType, + options: java.util.Map[String, String]): DataFrame = { + read.format(source).schema(schema).options(options).load() + } + + /** + * (Scala-specific) Returns the dataset specified by the given data source and + * a set of options as a DataFrame, using the given schema as the schema of the DataFrame. + * + * @group genericdata + * @deprecated As of 1.4.0, replaced by + * `read().format(source).schema(schema).options(options).load()`. + */ + @deprecated("Use read.format(source).schema(schema).options(options).load() instead.", "1.4.0") + def load(source: String, schema: StructType, options: Map[String, String]): DataFrame = { + read.format(source).schema(schema).options(options).load() + } + + /** + * Construct a `DataFrame` representing the database table accessible via JDBC URL + * url named table. + * + * @group specificdata + * @deprecated As of 1.4.0, replaced by `read().jdbc()`. + */ + @deprecated("Use read.jdbc() instead.", "1.4.0") + def jdbc(url: String, table: String): DataFrame = { + read.jdbc(url, table, new Properties) + } + + /** + * Construct a `DataFrame` representing the database table accessible via JDBC URL + * url named table. Partitions of the table will be retrieved in parallel based on the parameters + * passed to this function. + * + * @param columnName the name of a column of integral type that will be used for partitioning. + * @param lowerBound the minimum value of `columnName` used to decide partition stride + * @param upperBound the maximum value of `columnName` used to decide partition stride + * @param numPartitions the number of partitions. the range `minValue`-`maxValue` will be split + * evenly into this many partitions + * @group specificdata + * @deprecated As of 1.4.0, replaced by `read().jdbc()`. + */ + @deprecated("Use read.jdbc() instead.", "1.4.0") + def jdbc( + url: String, + table: String, + columnName: String, + lowerBound: Long, + upperBound: Long, + numPartitions: Int): DataFrame = { + read.jdbc(url, table, columnName, lowerBound, upperBound, numPartitions, new Properties) + } + + /** + * Construct a `DataFrame` representing the database table accessible via JDBC URL + * url named table. The theParts parameter gives a list expressions + * suitable for inclusion in WHERE clauses; each one defines one partition + * of the `DataFrame`. + * + * @group specificdata + * @deprecated As of 1.4.0, replaced by `read().jdbc()`. + */ + @deprecated("Use read.jdbc() instead.", "1.4.0") + def jdbc(url: String, table: String, theParts: Array[String]): DataFrame = { + read.jdbc(url, table, theParts, new Properties) + } } /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DeprecatedAPISuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DeprecatedAPISuite.scala new file mode 100644 index 0000000..c31ef99 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/DeprecatedAPISuite.scala @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import org.apache.spark.sql.test.SharedSparkSession +import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} + +class DeprecatedAPISuite extends QueryTest with SharedSparkSession { + + test("SQLContext.applySchema") { + val rowRdd = sparkContext.parallelize(Seq(Row("Jack", 20), Row("Marry", 18))) + val schema = StructType(StructField("name", StringType, false) :: + StructField("age", IntegerType, true) :: Nil) + val sqlContext = spark.sqlContext + checkAnswer(sqlContext.applySchema(rowRdd, schema), Row("Jack", 20) :: Row("Marry", 18) :: Nil) + checkAnswer(sqlContext.applySchema(rowRdd.toJavaRDD(), schema), + Row("Jack", 20) :: Row("Marry", 18) :: Nil) + } + + test("SQLContext.parquetFile") { + val sqlContext = spark.sqlContext + withTempDir { dir => + val parquetFile = s"${dir.toString}/${System.currentTimeMillis()}" + val expectDF = spark.range(10).toDF() + expectDF.write.parquet(parquetFile) + val parquetDF = sqlContext.parquetFile(parquetFile) + checkAnswer(parquetDF, expectDF) + } + } + + test("SQLContext.jsonFile") { + val sqlContext = spark.sqlContext + withTempDir { dir => + val jsonFile = s"${dir.toString}/${System.currentTimeMillis()}" + val expectDF = spark.range(10).toDF() + expectDF.write.json(jsonFile) + var jsonDF = sqlContext.jsonFile(jsonFile) + checkAnswer(jsonDF, expectDF) + assert(jsonDF.schema === expectDF.schema.asNullable) + + var schema = expectDF.schema + jsonDF = sqlContext.jsonFile(jsonFile, schema) + checkAnswer(jsonDF, expectDF) + assert(jsonDF.schema === schema.asNullable) + + jsonDF = sqlContext.jsonFile(jsonFile, 0.9) + checkAnswer(jsonDF, expectDF) + + val jsonRDD = sparkContext.parallelize(Seq("{\"name\":\"Jack\",\"age\":20}", + "{\"name\":\"Marry\",\"age\":18}")) + jsonDF = sqlContext.jsonRDD(jsonRDD) + checkAnswer(jsonDF, Row(18, "Marry") :: Row(20, "Jack") :: Nil) + jsonDF = sqlContext.jsonRDD(jsonRDD.toJavaRDD()) + checkAnswer(jsonDF, Row(18, "Marry") :: Row(20, "Jack") :: Nil) + + schema = StructType(StructField("name", StringType, false) :: + StructField("age", IntegerType, false) :: Nil) + jsonDF = sqlContext.jsonRDD(jsonRDD, schema) + checkAnswer(jsonDF, Row("Jack", 20) :: Row("Marry", 18) :: Nil) + jsonDF = sqlContext.jsonRDD(jsonRDD.toJavaRDD(), schema) + checkAnswer(jsonDF, Row("Jack", 20) :: Row("Marry", 18) :: Nil) + + + jsonDF = sqlContext.jsonRDD(jsonRDD, 0.9) + checkAnswer(jsonDF, Row(18, "Marry") :: Row(20, "Jack") :: Nil) + jsonDF = sqlContext.jsonRDD(jsonRDD.toJavaRDD(), 0.9) + checkAnswer(jsonDF, Row(18, "Marry") :: Row(20, "Jack") :: Nil) + } + } + + test("SQLContext.load") { + withTempDir { dir => + val path = s"${dir.toString}/${System.currentTimeMillis()}" + val expectDF = spark.range(10).toDF() + expectDF.write.parquet(path) + val sqlContext = spark.sqlContext + + var loadDF = sqlContext.load(path) + checkAnswer(loadDF, expectDF) + + loadDF = sqlContext.load(path, "parquet") + checkAnswer(loadDF, expectDF) + + loadDF = sqlContext.load("parquet", Map("path" -> path)) + checkAnswer(loadDF, expectDF) + + loadDF = sqlContext.load("parquet", expectDF.schema, Map("path" -> path)) + checkAnswer(loadDF, expectDF) + } + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index 9cba95f..fd691f5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -1699,4 +1699,18 @@ class JDBCSuite extends QueryTest assert(JdbcDialects.get("jdbc:teradata://localhost/db") === TeradataDialect) assert(JdbcDialects.get("jdbc:Teradata://localhost/db") === TeradataDialect) } + + test("SQLContext.jdbc (deprecated)") { + val sqlContext = spark.sqlContext + var jdbcDF = sqlContext.jdbc(urlWithUserAndPass, "TEST.PEOPLE") + checkAnswer(jdbcDF, Row("fred", 1) :: Row("mary", 2) :: Row ("joe 'foo' \"bar\"", 3) :: Nil) + + jdbcDF = sqlContext.jdbc(urlWithUserAndPass, "TEST.PEOPLE", "THEID", 0, 4, 3) + checkNumPartitions(jdbcDF, 3) + checkAnswer(jdbcDF, Row("fred", 1) :: Row("mary", 2) :: Row ("joe 'foo' \"bar\"", 3) :: Nil) + + val parts = Array[String]("THEID = 2") + jdbcDF = sqlContext.jdbc(urlWithUserAndPass, "TEST.PEOPLE", parts) + checkAnswer(jdbcDF, Row("mary", 2) :: Nil) + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org