This is an automated email from the ASF dual-hosted git repository.

lixiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 3884455  [SPARK-31087] [SQL] Add Back Multiple Removed APIs
3884455 is described below

commit 3884455780a214c620f309e00d5a083039746755
Author: gatorsmile <gatorsm...@gmail.com>
AuthorDate: Sat Mar 28 22:05:16 2020 -0700

    [SPARK-31087] [SQL] Add Back Multiple Removed APIs
    
    ### What changes were proposed in this pull request?
    
    Based on the discussion in the mailing list [[Proposal] Modification to 
Spark's Semantic Versioning 
Policy](http://apache-spark-developers-list.1001551.n3.nabble.com/Proposal-Modification-to-Spark-s-Semantic-Versioning-Policy-td28938.html)
 , this PR is to add back the following APIs whose maintenance cost are 
relatively small.
    
    - functions.toDegrees/toRadians
    - functions.approxCountDistinct
    - functions.monotonicallyIncreasingId
    - Column.!==
    - Dataset.explode
    - Dataset.registerTempTable
    - SQLContext.getOrCreate, setActive, clearActive, constructors
    
    Below is the other removed APIs in the original PR, but not added back in 
this PR [https://issues.apache.org/jira/browse/SPARK-25908]:
    
    - Remove some AccumulableInfo .apply() methods
    - Remove non-label-specific multiclass precision/recall/fScore in favor of 
accuracy
    - Remove unused Python StorageLevel constants
    - Remove unused multiclass option in libsvm parsing
    - Remove references to deprecated spark configs like spark.yarn.am.port
    - Remove TaskContext.isRunningLocally
    - Remove ShuffleMetrics.shuffle* methods
    - Remove BaseReadWrite.context in favor of session
    
    ### Why are the changes needed?
    Avoid breaking the APIs that are commonly used.
    
    ### Does this PR introduce any user-facing change?
    Adding back the APIs that were removed in 3.0 branch does not introduce the 
user-facing changes, because Spark 3.0 has not been released.
    
    ### How was this patch tested?
    Added a new test suite for these APIs.
    
    Author: gatorsmile <gatorsm...@gmail.com>
    Author: yi.wu <yi...@databricks.com>
    
    Closes #27821 from gatorsmile/addAPIBackV2.
---
 project/MimaExcludes.scala                         |   8 --
 python/pyspark/sql/dataframe.py                    |  19 ++++
 python/pyspark/sql/functions.py                    |  11 ++
 .../main/scala/org/apache/spark/sql/Column.scala   |  18 ++++
 .../main/scala/org/apache/spark/sql/Dataset.scala  |  98 ++++++++++++++++++
 .../scala/org/apache/spark/sql/SQLContext.scala    |  50 ++++++++-
 .../scala/org/apache/spark/sql/functions.scala     |  79 ++++++++++++++
 .../org/apache/spark/sql/DataFrameSuite.scala      |  46 +++++++++
 .../org/apache/spark/sql/DeprecatedAPISuite.scala  | 114 +++++++++++++++++++++
 .../org/apache/spark/sql/SQLContextSuite.scala     |  30 ++++--
 10 files changed, 458 insertions(+), 15 deletions(-)

diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 3f521e6..f28ae56 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -242,14 +242,6 @@ object MimaExcludes {
     
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.executor.ShuffleWriteMetrics.shuffleWriteTime"),
     
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.executor.ShuffleWriteMetrics.shuffleRecordsWritten"),
     
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.scheduler.AccumulableInfo.apply"),
-    
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.functions.approxCountDistinct"),
-    
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.functions.toRadians"),
-    
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.functions.toDegrees"),
-    
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.functions.monotonicallyIncreasingId"),
-    
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.SQLContext.clearActive"),
-    
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.SQLContext.getOrCreate"),
-    
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.SQLContext.setActive"),
-    
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.sql.SQLContext.this"),
     
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.mllib.evaluation.MulticlassMetrics.fMeasure"),
     
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.mllib.evaluation.MulticlassMetrics.recall"),
     
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.mllib.evaluation.MulticlassMetrics.precision"),
diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py
index 971cdb1..78b5746 100644
--- a/python/pyspark/sql/dataframe.py
+++ b/python/pyspark/sql/dataframe.py
@@ -122,6 +122,25 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
         rdd = self._jdf.toJSON()
         return RDD(rdd.toJavaRDD(), self._sc, UTF8Deserializer(use_unicode))
 
+    @since(1.3)
+    def registerTempTable(self, name):
+        """Registers this DataFrame as a temporary table using the given name.
+
+        The lifetime of this temporary table is tied to the 
:class:`SparkSession`
+        that was used to create this :class:`DataFrame`.
+
+        >>> df.registerTempTable("people")
+        >>> df2 = spark.sql("select * from people")
+        >>> sorted(df.collect()) == sorted(df2.collect())
+        True
+        >>> spark.catalog.dropTempView("people")
+
+        .. note:: Deprecated in 2.0, use createOrReplaceTempView instead.
+        """
+        warnings.warn(
+            "Deprecated in 2.0, use createOrReplaceTempView instead.", 
DeprecationWarning)
+        self._jdf.createOrReplaceTempView(name)
+
     @since(2.0)
     def createTempView(self, name):
         """Creates a local temporary view with this :class:`DataFrame`.
diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py
index e089963..27030e5 100644
--- a/python/pyspark/sql/functions.py
+++ b/python/pyspark/sql/functions.py
@@ -296,6 +296,8 @@ _window_functions = {
 
 # Wraps deprecated functions (keys) with the messages (values).
 _functions_deprecated = {
+    'toDegrees': 'Deprecated in 2.1, use degrees instead.',
+    'toRadians': 'Deprecated in 2.1, use radians instead.',
 }
 
 for _name, _doc in _functions.items():
@@ -319,6 +321,15 @@ for _name, _doc in _functions_2_4.items():
 del _name, _doc
 
 
+@since(1.3)
+def approxCountDistinct(col, rsd=None):
+    """
+    .. note:: Deprecated in 2.1, use :func:`approx_count_distinct` instead.
+    """
+    warnings.warn("Deprecated in 2.1, use approx_count_distinct instead.", 
DeprecationWarning)
+    return approx_count_distinct(col, rsd)
+
+
 @since(2.1)
 def approx_count_distinct(col, rsd=None):
     """Aggregate function: returns a new :class:`Column` for approximate 
distinct count of
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala
index 8bd5835..49c9f83 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala
@@ -331,6 +331,24 @@ class Column(val expr: Expression) extends Logging {
    *   df.filter( col("colA").notEqual(col("colB")) );
    * }}}
    *
+   * @group expr_ops
+   * @since 1.3.0
+    */
+  @deprecated("!== does not have the same precedence as ===, use =!= instead", 
"2.0.0")
+  def !== (other: Any): Column = this =!= other
+
+  /**
+   * Inequality test.
+   * {{{
+   *   // Scala:
+   *   df.select( df("colA") !== df("colB") )
+   *   df.select( !(df("colA") === df("colB")) )
+   *
+   *   // Java:
+   *   import static org.apache.spark.sql.functions.*;
+   *   df.filter( col("colA").notEqual(col("colB")) );
+   * }}}
+   *
    * @group java_expr_ops
    * @since 1.3.0
    */
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index e1e3e8e..c897170 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -22,6 +22,7 @@ import java.io.{ByteArrayOutputStream, CharArrayWriter, 
DataOutputStream}
 import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
 import scala.language.implicitConversions
+import scala.reflect.runtime.universe.TypeTag
 import scala.util.control.NonFatal
 
 import org.apache.commons.lang3.StringUtils
@@ -34,6 +35,7 @@ import org.apache.spark.api.python.{PythonRDD, SerDeUtil}
 import org.apache.spark.api.r.RRDD
 import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow, 
ScalaReflection}
 import org.apache.spark.sql.catalyst.QueryPlanningTracker
 import org.apache.spark.sql.catalyst.analysis._
 import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
@@ -2266,6 +2268,90 @@ class Dataset[T] private[sql](
   }
 
   /**
+   * (Scala-specific) Returns a new Dataset where each row has been expanded 
to zero or more
+   * rows by the provided function. This is similar to a `LATERAL VIEW` in 
HiveQL. The columns of
+   * the input row are implicitly joined with each row that is output by the 
function.
+   *
+   * Given that this is deprecated, as an alternative, you can explode columns 
either using
+   * `functions.explode()` or `flatMap()`. The following example uses these 
alternatives to count
+   * the number of books that contain a given word:
+   *
+   * {{{
+   *   case class Book(title: String, words: String)
+   *   val ds: Dataset[Book]
+   *
+   *   val allWords = ds.select('title, explode(split('words, " ")).as("word"))
+   *
+   *   val bookCountPerWord = 
allWords.groupBy("word").agg(countDistinct("title"))
+   * }}}
+   *
+   * Using `flatMap()` this can similarly be exploded as:
+   *
+   * {{{
+   *   ds.flatMap(_.words.split(" "))
+   * }}}
+   *
+   * @group untypedrel
+   * @since 2.0.0
+   */
+  @deprecated("use flatMap() or select() with functions.explode() instead", 
"2.0.0")
+  def explode[A <: Product : TypeTag](input: Column*)(f: Row => 
TraversableOnce[A]): DataFrame = {
+    val elementSchema = 
ScalaReflection.schemaFor[A].dataType.asInstanceOf[StructType]
+
+    val convert = 
CatalystTypeConverters.createToCatalystConverter(elementSchema)
+
+    val rowFunction =
+      f.andThen(_.map(convert(_).asInstanceOf[InternalRow]))
+    val generator = UserDefinedGenerator(elementSchema, rowFunction, 
input.map(_.expr))
+
+    withPlan {
+      Generate(generator, unrequiredChildIndex = Nil, outer = false,
+        qualifier = None, generatorOutput = Nil, logicalPlan)
+    }
+  }
+
+  /**
+   * (Scala-specific) Returns a new Dataset where a single column has been 
expanded to zero
+   * or more rows by the provided function. This is similar to a `LATERAL 
VIEW` in HiveQL. All
+   * columns of the input row are implicitly joined with each value that is 
output by the function.
+   *
+   * Given that this is deprecated, as an alternative, you can explode columns 
either using
+   * `functions.explode()`:
+   *
+   * {{{
+   *   ds.select(explode(split('words, " ")).as("word"))
+   * }}}
+   *
+   * or `flatMap()`:
+   *
+   * {{{
+   *   ds.flatMap(_.words.split(" "))
+   * }}}
+   *
+   * @group untypedrel
+   * @since 2.0.0
+   */
+  @deprecated("use flatMap() or select() with functions.explode() instead", 
"2.0.0")
+  def explode[A, B : TypeTag](inputColumn: String, outputColumn: String)(f: A 
=> TraversableOnce[B])
+    : DataFrame = {
+    val dataType = ScalaReflection.schemaFor[B].dataType
+    val attributes = AttributeReference(outputColumn, dataType)() :: Nil
+    // TODO handle the metadata?
+    val elementSchema = attributes.toStructType
+
+    def rowFunction(row: Row): TraversableOnce[InternalRow] = {
+      val convert = CatalystTypeConverters.createToCatalystConverter(dataType)
+      f(row(0).asInstanceOf[A]).map(o => InternalRow(convert(o)))
+    }
+    val generator = UserDefinedGenerator(elementSchema, rowFunction, 
apply(inputColumn).expr :: Nil)
+
+    withPlan {
+      Generate(generator, unrequiredChildIndex = Nil, outer = false,
+        qualifier = None, generatorOutput = Nil, logicalPlan)
+    }
+  }
+
+  /**
    * Returns a new Dataset by adding a column or replacing the existing column 
that has
    * the same name.
    *
@@ -3130,6 +3216,18 @@ class Dataset[T] private[sql](
   def javaRDD: JavaRDD[T] = toJavaRDD
 
   /**
+   * Registers this Dataset as a temporary table using the given name. The 
lifetime of this
+   * temporary table is tied to the [[SparkSession]] that was used to create 
this Dataset.
+   *
+   * @group basic
+   * @since 1.6.0
+   */
+  @deprecated("Use createOrReplaceTempView(viewName) instead.", "2.0.0")
+  def registerTempTable(tableName: String): Unit = {
+    createOrReplaceTempView(tableName)
+  }
+
+  /**
    * Creates a local temporary view using the given name. The lifetime of this
    * temporary view is tied to the [[SparkSession]] that was used to create 
this Dataset.
    *
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index bbcc842..68ce82d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -24,7 +24,7 @@ import scala.reflect.runtime.universe.TypeTag
 
 import org.apache.spark.{SparkConf, SparkContext}
 import org.apache.spark.annotation.{DeveloperApi, Experimental, Stable, 
Unstable}
-import org.apache.spark.api.java.JavaRDD
+import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config.ConfigEntry
 import org.apache.spark.rdd.RDD
@@ -64,6 +64,15 @@ class SQLContext private[sql](val sparkSession: SparkSession)
 
   // Note: Since Spark 2.0 this class has become a wrapper of SparkSession, 
where the
   // real functionality resides. This class remains mainly for backward 
compatibility.
+
+  @deprecated("Use SparkSession.builder instead", "2.0.0")
+  def this(sc: SparkContext) = {
+    this(SparkSession.builder().sparkContext(sc).getOrCreate())
+  }
+
+  @deprecated("Use SparkSession.builder instead", "2.0.0")
+  def this(sparkContext: JavaSparkContext) = this(sparkContext.sc)
+
   // TODO: move this logic into SparkSession
 
   private[sql] def sessionState: SessionState = sparkSession.sessionState
@@ -998,6 +1007,45 @@ class SQLContext private[sql](val sparkSession: 
SparkSession)
 object SQLContext {
 
   /**
+   * Get the singleton SQLContext if it exists or create a new one using the 
given SparkContext.
+   *
+   * This function can be used to create a singleton SQLContext object that 
can be shared across
+   * the JVM.
+   *
+   * If there is an active SQLContext for current thread, it will be returned 
instead of the global
+   * one.
+   *
+   * @since 1.5.0
+   */
+  @deprecated("Use SparkSession.builder instead", "2.0.0")
+  def getOrCreate(sparkContext: SparkContext): SQLContext = {
+    SparkSession.builder().sparkContext(sparkContext).getOrCreate().sqlContext
+  }
+
+  /**
+   * Changes the SQLContext that will be returned in this thread and its 
children when
+   * SQLContext.getOrCreate() is called. This can be used to ensure that a 
given thread receives
+   * a SQLContext with an isolated session, instead of the global (first 
created) context.
+   *
+   * @since 1.6.0
+   */
+  @deprecated("Use SparkSession.setActiveSession instead", "2.0.0")
+  def setActive(sqlContext: SQLContext): Unit = {
+    SparkSession.setActiveSession(sqlContext.sparkSession)
+  }
+
+  /**
+   * Clears the active SQLContext for current thread. Subsequent calls to 
getOrCreate will
+   * return the first created context instead of a thread-local override.
+   *
+   * @since 1.6.0
+   */
+  @deprecated("Use SparkSession.clearActiveSession instead", "2.0.0")
+  def clearActive(): Unit = {
+    SparkSession.clearActiveSession()
+  }
+
+  /**
    * Converts an iterator of Java Beans to InternalRow using the provided
    * bean info & schema. This is not related to the singleton, but is a static
    * method for internal use.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
index deafb8c..e8141b8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
@@ -212,6 +212,36 @@ object functions {
   
//////////////////////////////////////////////////////////////////////////////////////////////
 
   /**
+   * @group agg_funcs
+   * @since 1.3.0
+   */
+  @deprecated("Use approx_count_distinct", "2.1.0")
+  def approxCountDistinct(e: Column): Column = approx_count_distinct(e)
+
+  /**
+   * @group agg_funcs
+   * @since 1.3.0
+   */
+  @deprecated("Use approx_count_distinct", "2.1.0")
+  def approxCountDistinct(columnName: String): Column = 
approx_count_distinct(columnName)
+
+  /**
+   * @group agg_funcs
+   * @since 1.3.0
+   */
+  @deprecated("Use approx_count_distinct", "2.1.0")
+  def approxCountDistinct(e: Column, rsd: Double): Column = 
approx_count_distinct(e, rsd)
+
+  /**
+   * @group agg_funcs
+   * @since 1.3.0
+   */
+  @deprecated("Use approx_count_distinct", "2.1.0")
+  def approxCountDistinct(columnName: String, rsd: Double): Column = {
+    approx_count_distinct(Column(columnName), rsd)
+  }
+
+  /**
    * Aggregate function: returns the approximate number of distinct items in a 
group.
    *
    * @group agg_funcs
@@ -1129,6 +1159,27 @@ object functions {
    * }}}
    *
    * @group normal_funcs
+   * @since 1.4.0
+   */
+  @deprecated("Use monotonically_increasing_id()", "2.0.0")
+  def monotonicallyIncreasingId(): Column = monotonically_increasing_id()
+
+  /**
+   * A column expression that generates monotonically increasing 64-bit 
integers.
+   *
+   * The generated ID is guaranteed to be monotonically increasing and unique, 
but not consecutive.
+   * The current implementation puts the partition ID in the upper 31 bits, 
and the record number
+   * within each partition in the lower 33 bits. The assumption is that the 
data frame has
+   * less than 1 billion partitions, and each partition has less than 8 
billion records.
+   *
+   * As an example, consider a `DataFrame` with two partitions, each with 3 
records.
+   * This expression would return the following IDs:
+   *
+   * {{{
+   * 0, 1, 2, 8589934592 (1L << 33), 8589934593, 8589934594.
+   * }}}
+   *
+   * @group normal_funcs
    * @since 1.6.0
    */
   def monotonically_increasing_id(): Column = withExpr { 
MonotonicallyIncreasingID() }
@@ -2095,6 +2146,20 @@ object functions {
   def tanh(columnName: String): Column = tanh(Column(columnName))
 
   /**
+   * @group math_funcs
+   * @since 1.4.0
+   */
+  @deprecated("Use degrees", "2.1.0")
+  def toDegrees(e: Column): Column = degrees(e)
+
+  /**
+   * @group math_funcs
+   * @since 1.4.0
+   */
+  @deprecated("Use degrees", "2.1.0")
+  def toDegrees(columnName: String): Column = degrees(Column(columnName))
+
+  /**
    * Converts an angle measured in radians to an approximately equivalent 
angle measured in degrees.
    *
    * @param e angle in radians
@@ -2117,6 +2182,20 @@ object functions {
   def degrees(columnName: String): Column = degrees(Column(columnName))
 
   /**
+   * @group math_funcs
+   * @since 1.4.0
+   */
+  @deprecated("Use radians", "2.1.0")
+  def toRadians(e: Column): Column = radians(e)
+
+  /**
+   * @group math_funcs
+   * @since 1.4.0
+   */
+  @deprecated("Use radians", "2.1.0")
+  def toRadians(columnName: String): Column = radians(Column(columnName))
+
+  /**
    * Converts an angle measured in degrees to an approximately equivalent 
angle measured in radians.
    *
    * @param e angle in degrees
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index 72aa7bf..1762bc6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -109,6 +109,31 @@ class DataFrameSuite extends QueryTest
     dfAlias.col("t2.c")
   }
 
+  test("simple explode") {
+    val df = Seq(Tuple1("a b c"), Tuple1("d e")).toDF("words")
+
+    checkAnswer(
+      df.explode("words", "word") { word: String => word.split(" ").toSeq 
}.select('word),
+      Row("a") :: Row("b") :: Row("c") :: Row("d") ::Row("e") :: Nil
+    )
+  }
+
+  test("explode") {
+    val df = Seq((1, "a b c"), (2, "a b"), (3, "a")).toDF("number", "letters")
+    val df2 =
+      df.explode('letters) {
+        case Row(letters: String) => letters.split(" ").map(Tuple1(_)).toSeq
+      }
+
+    checkAnswer(
+      df2
+        .select('_1 as 'letter, 'number)
+        .groupBy('letter)
+        .agg(countDistinct('number)),
+      Row("a", 3) :: Row("b", 2) :: Row("c", 1) :: Nil
+    )
+  }
+
   test("Star Expansion - CreateStruct and CreateArray") {
     val structDf = testData2.select("a", "b").as("record")
     // CreateStruct and CreateArray in aggregateExpressions
@@ -185,6 +210,27 @@ class DataFrameSuite extends QueryTest
     }
   }
 
+  test("Star Expansion - ds.explode should fail with a meaningful message if 
it takes a star") {
+    val df = Seq(("1", "1,2"), ("2", "4"), ("3", "7,8,9")).toDF("prefix", 
"csv")
+    val e = intercept[AnalysisException] {
+      df.explode($"*") { case Row(prefix: String, csv: String) =>
+        csv.split(",").map(v => Tuple1(prefix + ":" + v)).toSeq
+      }.queryExecution.assertAnalyzed()
+    }
+    assert(e.getMessage.contains("Invalid usage of '*' in 
explode/json_tuple/UDTF"))
+
+    checkAnswer(
+      df.explode('prefix, 'csv) { case Row(prefix: String, csv: String) =>
+        csv.split(",").map(v => Tuple1(prefix + ":" + v)).toSeq
+      },
+      Row("1", "1,2", "1:1") ::
+        Row("1", "1,2", "1:2") ::
+        Row("2", "4", "2:4") ::
+        Row("3", "7,8,9", "3:7") ::
+        Row("3", "7,8,9", "3:8") ::
+        Row("3", "7,8,9", "3:9") :: Nil)
+  }
+
   test("Star Expansion - explode should fail with a meaningful message if it 
takes a star") {
     val df = Seq(("1,2"), ("4"), ("7,8,9")).toDF("csv")
     val e = intercept[AnalysisException] {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DeprecatedAPISuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DeprecatedAPISuite.scala
index c31ef99..25b8849 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DeprecatedAPISuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DeprecatedAPISuite.scala
@@ -17,10 +17,124 @@
 
 package org.apache.spark.sql
 
+import org.apache.spark.sql.functions._
 import org.apache.spark.sql.test.SharedSparkSession
 import org.apache.spark.sql.types.{IntegerType, StringType, StructField, 
StructType}
 
 class DeprecatedAPISuite extends QueryTest with SharedSparkSession {
+  import MathFunctionsTestData.DoubleData
+  import testImplicits._
+
+  private lazy val doubleData = (1 to 10).map(i => DoubleData(i * 0.2 - 1, i * 
-0.2 + 1)).toDF()
+
+  private def testOneToOneMathFunction[
+    @specialized(Int, Long, Float, Double) T,
+    @specialized(Int, Long, Float, Double) U](
+        c: Column => Column,
+        f: T => U): Unit = {
+    checkAnswer(
+      doubleData.select(c('a)),
+      (1 to 10).map(n => Row(f((n * 0.2 - 1).asInstanceOf[T])))
+    )
+
+    checkAnswer(
+      doubleData.select(c('b)),
+      (1 to 10).map(n => Row(f((-n * 0.2 + 1).asInstanceOf[T])))
+    )
+
+    checkAnswer(
+      doubleData.select(c(lit(null))),
+      (1 to 10).map(_ => Row(null))
+    )
+  }
+
+  test("functions.toDegrees") {
+    testOneToOneMathFunction(toDegrees, math.toDegrees)
+    withView("t") {
+      val df = Seq(0, 1, 1.5).toDF("a")
+      df.createOrReplaceTempView("t")
+
+      checkAnswer(
+        sql("SELECT degrees(0), degrees(1), degrees(1.5)"),
+        Seq(0).toDF().select(toDegrees(lit(0)), toDegrees(lit(1)), 
toDegrees(lit(1.5)))
+      )
+      checkAnswer(
+        sql("SELECT degrees(a) FROM t"),
+        df.select(toDegrees("a"))
+      )
+    }
+  }
+
+  test("functions.toRadians") {
+    testOneToOneMathFunction(toRadians, math.toRadians)
+    withView("t") {
+      val df = Seq(0, 1, 1.5).toDF("a")
+      df.createOrReplaceTempView("t")
+
+      checkAnswer(
+        sql("SELECT radians(0), radians(1), radians(1.5)"),
+        Seq(0).toDF().select(toRadians(lit(0)), toRadians(lit(1)), 
toRadians(lit(1.5)))
+      )
+      checkAnswer(
+        sql("SELECT radians(a) FROM t"),
+        df.select(toRadians("a"))
+      )
+    }
+  }
+
+  test("functions.approxCountDistinct") {
+    withView("t") {
+      val df = Seq(0, 1, 2).toDF("a")
+      df.createOrReplaceTempView("t")
+      checkAnswer(
+        sql("SELECT approx_count_distinct(a) FROM t"),
+        df.select(approxCountDistinct("a")))
+    }
+  }
+
+  test("functions.monotonicallyIncreasingId") {
+    // Make sure we have 2 partitions, each with 2 records.
+    val df = sparkContext.parallelize(Seq[Int](), 2).mapPartitions { _ =>
+      Iterator(Tuple1(1), Tuple1(2))
+    }.toDF("a")
+    checkAnswer(
+      df.select(monotonicallyIncreasingId(), 
expr("monotonically_increasing_id()")),
+      Row(0L, 0L) ::
+        Row(1L, 1L) ::
+        Row((1L << 33) + 0L, (1L << 33) + 0L) ::
+        Row((1L << 33) + 1L, (1L << 33) + 1L) :: Nil
+    )
+  }
+
+  test("Column.!==") {
+    val nullData = Seq(
+      (Some(1), Some(1)), (Some(1), Some(2)), (Some(1), None), (None, 
None)).toDF("a", "b")
+    checkAnswer(
+      nullData.filter($"b" !== 1),
+      Row(1, 2) :: Nil)
+
+    checkAnswer(nullData.filter($"b" !== null), Nil)
+
+    checkAnswer(
+      nullData.filter($"a" !== $"b"),
+      Row(1, 2) :: Nil)
+  }
+
+  test("Dataset.registerTempTable") {
+    withTempView("t") {
+      Seq(1).toDF().registerTempTable("t")
+      assert(spark.catalog.tableExists("t"))
+    }
+  }
+
+  test("SQLContext.setActive/clearActive") {
+    val sc = spark.sparkContext
+    val sqlContext = new SQLContext(sc)
+    SQLContext.setActive(sqlContext)
+    assert(SparkSession.getActiveSession === Some(spark))
+    SQLContext.clearActive()
+    assert(SparkSession.getActiveSession === None)
+  }
 
   test("SQLContext.applySchema") {
     val rowRdd = sparkContext.parallelize(Seq(Row("Jack", 20), Row("Marry", 
18)))
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala
index aab2ae4..a179982 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala
@@ -24,14 +24,32 @@ import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.{BooleanType, StringType, StructField, 
StructType}
 
+@deprecated("This suite is deprecated to silent compiler deprecation 
warnings", "2.0.0")
 class SQLContextSuite extends SparkFunSuite with SharedSparkContext {
 
   object DummyRule extends Rule[LogicalPlan] {
     def apply(p: LogicalPlan): LogicalPlan = p
   }
 
+  test("getOrCreate instantiates SQLContext") {
+    val sqlContext = SQLContext.getOrCreate(sc)
+    assert(sqlContext != null, "SQLContext.getOrCreate returned null")
+    assert(SQLContext.getOrCreate(sc).eq(sqlContext),
+      "SQLContext created by SQLContext.getOrCreate not returned by 
SQLContext.getOrCreate")
+  }
+
+  test("getOrCreate return the original SQLContext") {
+    val sqlContext = SQLContext.getOrCreate(sc)
+    val newSession = sqlContext.newSession()
+    assert(SQLContext.getOrCreate(sc).eq(sqlContext),
+      "SQLContext.getOrCreate after explicitly created SQLContext did not 
return the context")
+    SparkSession.setActiveSession(newSession.sparkSession)
+    assert(SQLContext.getOrCreate(sc).eq(newSession),
+      "SQLContext.getOrCreate after explicitly setActive() did not return the 
active context")
+  }
+
   test("Sessions of SQLContext") {
-    val sqlContext = 
SparkSession.builder().sparkContext(sc).getOrCreate().sqlContext
+    val sqlContext = SQLContext.getOrCreate(sc)
     val session1 = sqlContext.newSession()
     val session2 = sqlContext.newSession()
 
@@ -59,13 +77,13 @@ class SQLContextSuite extends SparkFunSuite with 
SharedSparkContext {
   }
 
   test("Catalyst optimization passes are modifiable at runtime") {
-    val sqlContext = 
SparkSession.builder().sparkContext(sc).getOrCreate().sqlContext
+    val sqlContext = SQLContext.getOrCreate(sc)
     sqlContext.experimental.extraOptimizations = Seq(DummyRule)
     
assert(sqlContext.sessionState.optimizer.batches.flatMap(_.rules).contains(DummyRule))
   }
 
   test("get all tables") {
-    val sqlContext = 
SparkSession.builder().sparkContext(sc).getOrCreate().sqlContext
+    val sqlContext = SQLContext.getOrCreate(sc)
     val df = sqlContext.range(10)
     df.createOrReplaceTempView("listtablessuitetable")
     assert(
@@ -82,7 +100,7 @@ class SQLContextSuite extends SparkFunSuite with 
SharedSparkContext {
   }
 
   test("getting all tables with a database name has no impact on returned 
table names") {
-    val sqlContext = 
SparkSession.builder().sparkContext(sc).getOrCreate().sqlContext
+    val sqlContext = SQLContext.getOrCreate(sc)
     val df = sqlContext.range(10)
     df.createOrReplaceTempView("listtablessuitetable")
     assert(
@@ -99,7 +117,7 @@ class SQLContextSuite extends SparkFunSuite with 
SharedSparkContext {
   }
 
   test("query the returned DataFrame of tables") {
-    val sqlContext = 
SparkSession.builder().sparkContext(sc).getOrCreate().sqlContext
+    val sqlContext = SQLContext.getOrCreate(sc)
     val df = sqlContext.range(10)
     df.createOrReplaceTempView("listtablessuitetable")
 
@@ -109,7 +127,7 @@ class SQLContextSuite extends SparkFunSuite with 
SharedSparkContext {
         StructField("isTemporary", BooleanType, false) :: Nil)
 
     Seq(sqlContext.tables(), sqlContext.sql("SHOW TABLes")).foreach {
-      tableDF =>
+      case tableDF =>
         assert(expectedSchema === tableDF.schema)
 
         tableDF.createOrReplaceTempView("tables")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to