This is an automated email from the ASF dual-hosted git repository. lixiao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 3884455 [SPARK-31087] [SQL] Add Back Multiple Removed APIs 3884455 is described below commit 3884455780a214c620f309e00d5a083039746755 Author: gatorsmile <gatorsm...@gmail.com> AuthorDate: Sat Mar 28 22:05:16 2020 -0700 [SPARK-31087] [SQL] Add Back Multiple Removed APIs ### What changes were proposed in this pull request? Based on the discussion in the mailing list [[Proposal] Modification to Spark's Semantic Versioning Policy](http://apache-spark-developers-list.1001551.n3.nabble.com/Proposal-Modification-to-Spark-s-Semantic-Versioning-Policy-td28938.html) , this PR is to add back the following APIs whose maintenance cost are relatively small. - functions.toDegrees/toRadians - functions.approxCountDistinct - functions.monotonicallyIncreasingId - Column.!== - Dataset.explode - Dataset.registerTempTable - SQLContext.getOrCreate, setActive, clearActive, constructors Below is the other removed APIs in the original PR, but not added back in this PR [https://issues.apache.org/jira/browse/SPARK-25908]: - Remove some AccumulableInfo .apply() methods - Remove non-label-specific multiclass precision/recall/fScore in favor of accuracy - Remove unused Python StorageLevel constants - Remove unused multiclass option in libsvm parsing - Remove references to deprecated spark configs like spark.yarn.am.port - Remove TaskContext.isRunningLocally - Remove ShuffleMetrics.shuffle* methods - Remove BaseReadWrite.context in favor of session ### Why are the changes needed? Avoid breaking the APIs that are commonly used. ### Does this PR introduce any user-facing change? Adding back the APIs that were removed in 3.0 branch does not introduce the user-facing changes, because Spark 3.0 has not been released. ### How was this patch tested? Added a new test suite for these APIs. Author: gatorsmile <gatorsm...@gmail.com> Author: yi.wu <yi...@databricks.com> Closes #27821 from gatorsmile/addAPIBackV2. --- project/MimaExcludes.scala | 8 -- python/pyspark/sql/dataframe.py | 19 ++++ python/pyspark/sql/functions.py | 11 ++ .../main/scala/org/apache/spark/sql/Column.scala | 18 ++++ .../main/scala/org/apache/spark/sql/Dataset.scala | 98 ++++++++++++++++++ .../scala/org/apache/spark/sql/SQLContext.scala | 50 ++++++++- .../scala/org/apache/spark/sql/functions.scala | 79 ++++++++++++++ .../org/apache/spark/sql/DataFrameSuite.scala | 46 +++++++++ .../org/apache/spark/sql/DeprecatedAPISuite.scala | 114 +++++++++++++++++++++ .../org/apache/spark/sql/SQLContextSuite.scala | 30 ++++-- 10 files changed, 458 insertions(+), 15 deletions(-) diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 3f521e6..f28ae56 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -242,14 +242,6 @@ object MimaExcludes { ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.executor.ShuffleWriteMetrics.shuffleWriteTime"), ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.executor.ShuffleWriteMetrics.shuffleRecordsWritten"), ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.scheduler.AccumulableInfo.apply"), - ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.functions.approxCountDistinct"), - ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.functions.toRadians"), - ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.functions.toDegrees"), - ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.functions.monotonicallyIncreasingId"), - ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.SQLContext.clearActive"), - ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.SQLContext.getOrCreate"), - ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.SQLContext.setActive"), - ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.sql.SQLContext.this"), ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.mllib.evaluation.MulticlassMetrics.fMeasure"), ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.mllib.evaluation.MulticlassMetrics.recall"), ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.mllib.evaluation.MulticlassMetrics.precision"), diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index 971cdb1..78b5746 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -122,6 +122,25 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin): rdd = self._jdf.toJSON() return RDD(rdd.toJavaRDD(), self._sc, UTF8Deserializer(use_unicode)) + @since(1.3) + def registerTempTable(self, name): + """Registers this DataFrame as a temporary table using the given name. + + The lifetime of this temporary table is tied to the :class:`SparkSession` + that was used to create this :class:`DataFrame`. + + >>> df.registerTempTable("people") + >>> df2 = spark.sql("select * from people") + >>> sorted(df.collect()) == sorted(df2.collect()) + True + >>> spark.catalog.dropTempView("people") + + .. note:: Deprecated in 2.0, use createOrReplaceTempView instead. + """ + warnings.warn( + "Deprecated in 2.0, use createOrReplaceTempView instead.", DeprecationWarning) + self._jdf.createOrReplaceTempView(name) + @since(2.0) def createTempView(self, name): """Creates a local temporary view with this :class:`DataFrame`. diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index e089963..27030e5 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -296,6 +296,8 @@ _window_functions = { # Wraps deprecated functions (keys) with the messages (values). _functions_deprecated = { + 'toDegrees': 'Deprecated in 2.1, use degrees instead.', + 'toRadians': 'Deprecated in 2.1, use radians instead.', } for _name, _doc in _functions.items(): @@ -319,6 +321,15 @@ for _name, _doc in _functions_2_4.items(): del _name, _doc +@since(1.3) +def approxCountDistinct(col, rsd=None): + """ + .. note:: Deprecated in 2.1, use :func:`approx_count_distinct` instead. + """ + warnings.warn("Deprecated in 2.1, use approx_count_distinct instead.", DeprecationWarning) + return approx_count_distinct(col, rsd) + + @since(2.1) def approx_count_distinct(col, rsd=None): """Aggregate function: returns a new :class:`Column` for approximate distinct count of diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala index 8bd5835..49c9f83 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala @@ -331,6 +331,24 @@ class Column(val expr: Expression) extends Logging { * df.filter( col("colA").notEqual(col("colB")) ); * }}} * + * @group expr_ops + * @since 1.3.0 + */ + @deprecated("!== does not have the same precedence as ===, use =!= instead", "2.0.0") + def !== (other: Any): Column = this =!= other + + /** + * Inequality test. + * {{{ + * // Scala: + * df.select( df("colA") !== df("colB") ) + * df.select( !(df("colA") === df("colB")) ) + * + * // Java: + * import static org.apache.spark.sql.functions.*; + * df.filter( col("colA").notEqual(col("colB")) ); + * }}} + * * @group java_expr_ops * @since 1.3.0 */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index e1e3e8e..c897170 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -22,6 +22,7 @@ import java.io.{ByteArrayOutputStream, CharArrayWriter, DataOutputStream} import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer import scala.language.implicitConversions +import scala.reflect.runtime.universe.TypeTag import scala.util.control.NonFatal import org.apache.commons.lang3.StringUtils @@ -34,6 +35,7 @@ import org.apache.spark.api.python.{PythonRDD, SerDeUtil} import org.apache.spark.api.r.RRDD import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow, ScalaReflection} import org.apache.spark.sql.catalyst.QueryPlanningTracker import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.catalog.HiveTableRelation @@ -2266,6 +2268,90 @@ class Dataset[T] private[sql]( } /** + * (Scala-specific) Returns a new Dataset where each row has been expanded to zero or more + * rows by the provided function. This is similar to a `LATERAL VIEW` in HiveQL. The columns of + * the input row are implicitly joined with each row that is output by the function. + * + * Given that this is deprecated, as an alternative, you can explode columns either using + * `functions.explode()` or `flatMap()`. The following example uses these alternatives to count + * the number of books that contain a given word: + * + * {{{ + * case class Book(title: String, words: String) + * val ds: Dataset[Book] + * + * val allWords = ds.select('title, explode(split('words, " ")).as("word")) + * + * val bookCountPerWord = allWords.groupBy("word").agg(countDistinct("title")) + * }}} + * + * Using `flatMap()` this can similarly be exploded as: + * + * {{{ + * ds.flatMap(_.words.split(" ")) + * }}} + * + * @group untypedrel + * @since 2.0.0 + */ + @deprecated("use flatMap() or select() with functions.explode() instead", "2.0.0") + def explode[A <: Product : TypeTag](input: Column*)(f: Row => TraversableOnce[A]): DataFrame = { + val elementSchema = ScalaReflection.schemaFor[A].dataType.asInstanceOf[StructType] + + val convert = CatalystTypeConverters.createToCatalystConverter(elementSchema) + + val rowFunction = + f.andThen(_.map(convert(_).asInstanceOf[InternalRow])) + val generator = UserDefinedGenerator(elementSchema, rowFunction, input.map(_.expr)) + + withPlan { + Generate(generator, unrequiredChildIndex = Nil, outer = false, + qualifier = None, generatorOutput = Nil, logicalPlan) + } + } + + /** + * (Scala-specific) Returns a new Dataset where a single column has been expanded to zero + * or more rows by the provided function. This is similar to a `LATERAL VIEW` in HiveQL. All + * columns of the input row are implicitly joined with each value that is output by the function. + * + * Given that this is deprecated, as an alternative, you can explode columns either using + * `functions.explode()`: + * + * {{{ + * ds.select(explode(split('words, " ")).as("word")) + * }}} + * + * or `flatMap()`: + * + * {{{ + * ds.flatMap(_.words.split(" ")) + * }}} + * + * @group untypedrel + * @since 2.0.0 + */ + @deprecated("use flatMap() or select() with functions.explode() instead", "2.0.0") + def explode[A, B : TypeTag](inputColumn: String, outputColumn: String)(f: A => TraversableOnce[B]) + : DataFrame = { + val dataType = ScalaReflection.schemaFor[B].dataType + val attributes = AttributeReference(outputColumn, dataType)() :: Nil + // TODO handle the metadata? + val elementSchema = attributes.toStructType + + def rowFunction(row: Row): TraversableOnce[InternalRow] = { + val convert = CatalystTypeConverters.createToCatalystConverter(dataType) + f(row(0).asInstanceOf[A]).map(o => InternalRow(convert(o))) + } + val generator = UserDefinedGenerator(elementSchema, rowFunction, apply(inputColumn).expr :: Nil) + + withPlan { + Generate(generator, unrequiredChildIndex = Nil, outer = false, + qualifier = None, generatorOutput = Nil, logicalPlan) + } + } + + /** * Returns a new Dataset by adding a column or replacing the existing column that has * the same name. * @@ -3130,6 +3216,18 @@ class Dataset[T] private[sql]( def javaRDD: JavaRDD[T] = toJavaRDD /** + * Registers this Dataset as a temporary table using the given name. The lifetime of this + * temporary table is tied to the [[SparkSession]] that was used to create this Dataset. + * + * @group basic + * @since 1.6.0 + */ + @deprecated("Use createOrReplaceTempView(viewName) instead.", "2.0.0") + def registerTempTable(tableName: String): Unit = { + createOrReplaceTempView(tableName) + } + + /** * Creates a local temporary view using the given name. The lifetime of this * temporary view is tied to the [[SparkSession]] that was used to create this Dataset. * diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index bbcc842..68ce82d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -24,7 +24,7 @@ import scala.reflect.runtime.universe.TypeTag import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.annotation.{DeveloperApi, Experimental, Stable, Unstable} -import org.apache.spark.api.java.JavaRDD +import org.apache.spark.api.java.{JavaRDD, JavaSparkContext} import org.apache.spark.internal.Logging import org.apache.spark.internal.config.ConfigEntry import org.apache.spark.rdd.RDD @@ -64,6 +64,15 @@ class SQLContext private[sql](val sparkSession: SparkSession) // Note: Since Spark 2.0 this class has become a wrapper of SparkSession, where the // real functionality resides. This class remains mainly for backward compatibility. + + @deprecated("Use SparkSession.builder instead", "2.0.0") + def this(sc: SparkContext) = { + this(SparkSession.builder().sparkContext(sc).getOrCreate()) + } + + @deprecated("Use SparkSession.builder instead", "2.0.0") + def this(sparkContext: JavaSparkContext) = this(sparkContext.sc) + // TODO: move this logic into SparkSession private[sql] def sessionState: SessionState = sparkSession.sessionState @@ -998,6 +1007,45 @@ class SQLContext private[sql](val sparkSession: SparkSession) object SQLContext { /** + * Get the singleton SQLContext if it exists or create a new one using the given SparkContext. + * + * This function can be used to create a singleton SQLContext object that can be shared across + * the JVM. + * + * If there is an active SQLContext for current thread, it will be returned instead of the global + * one. + * + * @since 1.5.0 + */ + @deprecated("Use SparkSession.builder instead", "2.0.0") + def getOrCreate(sparkContext: SparkContext): SQLContext = { + SparkSession.builder().sparkContext(sparkContext).getOrCreate().sqlContext + } + + /** + * Changes the SQLContext that will be returned in this thread and its children when + * SQLContext.getOrCreate() is called. This can be used to ensure that a given thread receives + * a SQLContext with an isolated session, instead of the global (first created) context. + * + * @since 1.6.0 + */ + @deprecated("Use SparkSession.setActiveSession instead", "2.0.0") + def setActive(sqlContext: SQLContext): Unit = { + SparkSession.setActiveSession(sqlContext.sparkSession) + } + + /** + * Clears the active SQLContext for current thread. Subsequent calls to getOrCreate will + * return the first created context instead of a thread-local override. + * + * @since 1.6.0 + */ + @deprecated("Use SparkSession.clearActiveSession instead", "2.0.0") + def clearActive(): Unit = { + SparkSession.clearActiveSession() + } + + /** * Converts an iterator of Java Beans to InternalRow using the provided * bean info & schema. This is not related to the singleton, but is a static * method for internal use. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala index deafb8c..e8141b8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala @@ -212,6 +212,36 @@ object functions { ////////////////////////////////////////////////////////////////////////////////////////////// /** + * @group agg_funcs + * @since 1.3.0 + */ + @deprecated("Use approx_count_distinct", "2.1.0") + def approxCountDistinct(e: Column): Column = approx_count_distinct(e) + + /** + * @group agg_funcs + * @since 1.3.0 + */ + @deprecated("Use approx_count_distinct", "2.1.0") + def approxCountDistinct(columnName: String): Column = approx_count_distinct(columnName) + + /** + * @group agg_funcs + * @since 1.3.0 + */ + @deprecated("Use approx_count_distinct", "2.1.0") + def approxCountDistinct(e: Column, rsd: Double): Column = approx_count_distinct(e, rsd) + + /** + * @group agg_funcs + * @since 1.3.0 + */ + @deprecated("Use approx_count_distinct", "2.1.0") + def approxCountDistinct(columnName: String, rsd: Double): Column = { + approx_count_distinct(Column(columnName), rsd) + } + + /** * Aggregate function: returns the approximate number of distinct items in a group. * * @group agg_funcs @@ -1129,6 +1159,27 @@ object functions { * }}} * * @group normal_funcs + * @since 1.4.0 + */ + @deprecated("Use monotonically_increasing_id()", "2.0.0") + def monotonicallyIncreasingId(): Column = monotonically_increasing_id() + + /** + * A column expression that generates monotonically increasing 64-bit integers. + * + * The generated ID is guaranteed to be monotonically increasing and unique, but not consecutive. + * The current implementation puts the partition ID in the upper 31 bits, and the record number + * within each partition in the lower 33 bits. The assumption is that the data frame has + * less than 1 billion partitions, and each partition has less than 8 billion records. + * + * As an example, consider a `DataFrame` with two partitions, each with 3 records. + * This expression would return the following IDs: + * + * {{{ + * 0, 1, 2, 8589934592 (1L << 33), 8589934593, 8589934594. + * }}} + * + * @group normal_funcs * @since 1.6.0 */ def monotonically_increasing_id(): Column = withExpr { MonotonicallyIncreasingID() } @@ -2095,6 +2146,20 @@ object functions { def tanh(columnName: String): Column = tanh(Column(columnName)) /** + * @group math_funcs + * @since 1.4.0 + */ + @deprecated("Use degrees", "2.1.0") + def toDegrees(e: Column): Column = degrees(e) + + /** + * @group math_funcs + * @since 1.4.0 + */ + @deprecated("Use degrees", "2.1.0") + def toDegrees(columnName: String): Column = degrees(Column(columnName)) + + /** * Converts an angle measured in radians to an approximately equivalent angle measured in degrees. * * @param e angle in radians @@ -2117,6 +2182,20 @@ object functions { def degrees(columnName: String): Column = degrees(Column(columnName)) /** + * @group math_funcs + * @since 1.4.0 + */ + @deprecated("Use radians", "2.1.0") + def toRadians(e: Column): Column = radians(e) + + /** + * @group math_funcs + * @since 1.4.0 + */ + @deprecated("Use radians", "2.1.0") + def toRadians(columnName: String): Column = radians(Column(columnName)) + + /** * Converts an angle measured in degrees to an approximately equivalent angle measured in radians. * * @param e angle in degrees diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index 72aa7bf..1762bc6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -109,6 +109,31 @@ class DataFrameSuite extends QueryTest dfAlias.col("t2.c") } + test("simple explode") { + val df = Seq(Tuple1("a b c"), Tuple1("d e")).toDF("words") + + checkAnswer( + df.explode("words", "word") { word: String => word.split(" ").toSeq }.select('word), + Row("a") :: Row("b") :: Row("c") :: Row("d") ::Row("e") :: Nil + ) + } + + test("explode") { + val df = Seq((1, "a b c"), (2, "a b"), (3, "a")).toDF("number", "letters") + val df2 = + df.explode('letters) { + case Row(letters: String) => letters.split(" ").map(Tuple1(_)).toSeq + } + + checkAnswer( + df2 + .select('_1 as 'letter, 'number) + .groupBy('letter) + .agg(countDistinct('number)), + Row("a", 3) :: Row("b", 2) :: Row("c", 1) :: Nil + ) + } + test("Star Expansion - CreateStruct and CreateArray") { val structDf = testData2.select("a", "b").as("record") // CreateStruct and CreateArray in aggregateExpressions @@ -185,6 +210,27 @@ class DataFrameSuite extends QueryTest } } + test("Star Expansion - ds.explode should fail with a meaningful message if it takes a star") { + val df = Seq(("1", "1,2"), ("2", "4"), ("3", "7,8,9")).toDF("prefix", "csv") + val e = intercept[AnalysisException] { + df.explode($"*") { case Row(prefix: String, csv: String) => + csv.split(",").map(v => Tuple1(prefix + ":" + v)).toSeq + }.queryExecution.assertAnalyzed() + } + assert(e.getMessage.contains("Invalid usage of '*' in explode/json_tuple/UDTF")) + + checkAnswer( + df.explode('prefix, 'csv) { case Row(prefix: String, csv: String) => + csv.split(",").map(v => Tuple1(prefix + ":" + v)).toSeq + }, + Row("1", "1,2", "1:1") :: + Row("1", "1,2", "1:2") :: + Row("2", "4", "2:4") :: + Row("3", "7,8,9", "3:7") :: + Row("3", "7,8,9", "3:8") :: + Row("3", "7,8,9", "3:9") :: Nil) + } + test("Star Expansion - explode should fail with a meaningful message if it takes a star") { val df = Seq(("1,2"), ("4"), ("7,8,9")).toDF("csv") val e = intercept[AnalysisException] { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DeprecatedAPISuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DeprecatedAPISuite.scala index c31ef99..25b8849 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DeprecatedAPISuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DeprecatedAPISuite.scala @@ -17,10 +17,124 @@ package org.apache.spark.sql +import org.apache.spark.sql.functions._ import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} class DeprecatedAPISuite extends QueryTest with SharedSparkSession { + import MathFunctionsTestData.DoubleData + import testImplicits._ + + private lazy val doubleData = (1 to 10).map(i => DoubleData(i * 0.2 - 1, i * -0.2 + 1)).toDF() + + private def testOneToOneMathFunction[ + @specialized(Int, Long, Float, Double) T, + @specialized(Int, Long, Float, Double) U]( + c: Column => Column, + f: T => U): Unit = { + checkAnswer( + doubleData.select(c('a)), + (1 to 10).map(n => Row(f((n * 0.2 - 1).asInstanceOf[T]))) + ) + + checkAnswer( + doubleData.select(c('b)), + (1 to 10).map(n => Row(f((-n * 0.2 + 1).asInstanceOf[T]))) + ) + + checkAnswer( + doubleData.select(c(lit(null))), + (1 to 10).map(_ => Row(null)) + ) + } + + test("functions.toDegrees") { + testOneToOneMathFunction(toDegrees, math.toDegrees) + withView("t") { + val df = Seq(0, 1, 1.5).toDF("a") + df.createOrReplaceTempView("t") + + checkAnswer( + sql("SELECT degrees(0), degrees(1), degrees(1.5)"), + Seq(0).toDF().select(toDegrees(lit(0)), toDegrees(lit(1)), toDegrees(lit(1.5))) + ) + checkAnswer( + sql("SELECT degrees(a) FROM t"), + df.select(toDegrees("a")) + ) + } + } + + test("functions.toRadians") { + testOneToOneMathFunction(toRadians, math.toRadians) + withView("t") { + val df = Seq(0, 1, 1.5).toDF("a") + df.createOrReplaceTempView("t") + + checkAnswer( + sql("SELECT radians(0), radians(1), radians(1.5)"), + Seq(0).toDF().select(toRadians(lit(0)), toRadians(lit(1)), toRadians(lit(1.5))) + ) + checkAnswer( + sql("SELECT radians(a) FROM t"), + df.select(toRadians("a")) + ) + } + } + + test("functions.approxCountDistinct") { + withView("t") { + val df = Seq(0, 1, 2).toDF("a") + df.createOrReplaceTempView("t") + checkAnswer( + sql("SELECT approx_count_distinct(a) FROM t"), + df.select(approxCountDistinct("a"))) + } + } + + test("functions.monotonicallyIncreasingId") { + // Make sure we have 2 partitions, each with 2 records. + val df = sparkContext.parallelize(Seq[Int](), 2).mapPartitions { _ => + Iterator(Tuple1(1), Tuple1(2)) + }.toDF("a") + checkAnswer( + df.select(monotonicallyIncreasingId(), expr("monotonically_increasing_id()")), + Row(0L, 0L) :: + Row(1L, 1L) :: + Row((1L << 33) + 0L, (1L << 33) + 0L) :: + Row((1L << 33) + 1L, (1L << 33) + 1L) :: Nil + ) + } + + test("Column.!==") { + val nullData = Seq( + (Some(1), Some(1)), (Some(1), Some(2)), (Some(1), None), (None, None)).toDF("a", "b") + checkAnswer( + nullData.filter($"b" !== 1), + Row(1, 2) :: Nil) + + checkAnswer(nullData.filter($"b" !== null), Nil) + + checkAnswer( + nullData.filter($"a" !== $"b"), + Row(1, 2) :: Nil) + } + + test("Dataset.registerTempTable") { + withTempView("t") { + Seq(1).toDF().registerTempTable("t") + assert(spark.catalog.tableExists("t")) + } + } + + test("SQLContext.setActive/clearActive") { + val sc = spark.sparkContext + val sqlContext = new SQLContext(sc) + SQLContext.setActive(sqlContext) + assert(SparkSession.getActiveSession === Some(spark)) + SQLContext.clearActive() + assert(SparkSession.getActiveSession === None) + } test("SQLContext.applySchema") { val rowRdd = sparkContext.parallelize(Seq(Row("Jack", 20), Row("Marry", 18))) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala index aab2ae4..a179982 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala @@ -24,14 +24,32 @@ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{BooleanType, StringType, StructField, StructType} +@deprecated("This suite is deprecated to silent compiler deprecation warnings", "2.0.0") class SQLContextSuite extends SparkFunSuite with SharedSparkContext { object DummyRule extends Rule[LogicalPlan] { def apply(p: LogicalPlan): LogicalPlan = p } + test("getOrCreate instantiates SQLContext") { + val sqlContext = SQLContext.getOrCreate(sc) + assert(sqlContext != null, "SQLContext.getOrCreate returned null") + assert(SQLContext.getOrCreate(sc).eq(sqlContext), + "SQLContext created by SQLContext.getOrCreate not returned by SQLContext.getOrCreate") + } + + test("getOrCreate return the original SQLContext") { + val sqlContext = SQLContext.getOrCreate(sc) + val newSession = sqlContext.newSession() + assert(SQLContext.getOrCreate(sc).eq(sqlContext), + "SQLContext.getOrCreate after explicitly created SQLContext did not return the context") + SparkSession.setActiveSession(newSession.sparkSession) + assert(SQLContext.getOrCreate(sc).eq(newSession), + "SQLContext.getOrCreate after explicitly setActive() did not return the active context") + } + test("Sessions of SQLContext") { - val sqlContext = SparkSession.builder().sparkContext(sc).getOrCreate().sqlContext + val sqlContext = SQLContext.getOrCreate(sc) val session1 = sqlContext.newSession() val session2 = sqlContext.newSession() @@ -59,13 +77,13 @@ class SQLContextSuite extends SparkFunSuite with SharedSparkContext { } test("Catalyst optimization passes are modifiable at runtime") { - val sqlContext = SparkSession.builder().sparkContext(sc).getOrCreate().sqlContext + val sqlContext = SQLContext.getOrCreate(sc) sqlContext.experimental.extraOptimizations = Seq(DummyRule) assert(sqlContext.sessionState.optimizer.batches.flatMap(_.rules).contains(DummyRule)) } test("get all tables") { - val sqlContext = SparkSession.builder().sparkContext(sc).getOrCreate().sqlContext + val sqlContext = SQLContext.getOrCreate(sc) val df = sqlContext.range(10) df.createOrReplaceTempView("listtablessuitetable") assert( @@ -82,7 +100,7 @@ class SQLContextSuite extends SparkFunSuite with SharedSparkContext { } test("getting all tables with a database name has no impact on returned table names") { - val sqlContext = SparkSession.builder().sparkContext(sc).getOrCreate().sqlContext + val sqlContext = SQLContext.getOrCreate(sc) val df = sqlContext.range(10) df.createOrReplaceTempView("listtablessuitetable") assert( @@ -99,7 +117,7 @@ class SQLContextSuite extends SparkFunSuite with SharedSparkContext { } test("query the returned DataFrame of tables") { - val sqlContext = SparkSession.builder().sparkContext(sc).getOrCreate().sqlContext + val sqlContext = SQLContext.getOrCreate(sc) val df = sqlContext.range(10) df.createOrReplaceTempView("listtablessuitetable") @@ -109,7 +127,7 @@ class SQLContextSuite extends SparkFunSuite with SharedSparkContext { StructField("isTemporary", BooleanType, false) :: Nil) Seq(sqlContext.tables(), sqlContext.sql("SHOW TABLes")).foreach { - tableDF => + case tableDF => assert(expectedSchema === tableDF.schema) tableDF.createOrReplaceTempView("tables") --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org