This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 157b72a  [SPARK-33591][SQL] Recognize `null` in partition spec values
157b72a is described below

commit 157b72ac9fa0057d5fd6d7ed52a6c4b22ebd1dfc
Author: Max Gekk <max.g...@gmail.com>
AuthorDate: Fri Jan 8 14:14:27 2021 +0000

    [SPARK-33591][SQL] Recognize `null` in partition spec values
    
    ### What changes were proposed in this pull request?
    1. Recognize `null` while parsing partition specs, and put `null` instead 
of `"null"` as partition values.
    2. For V1 catalog: replace `null` by `__HIVE_DEFAULT_PARTITION__`.
    3. For V2 catalogs: pass `null` AS IS, and let catalog implementations to 
decide how to handle `null`s as partition values in spec.
    
    ### Why are the changes needed?
    Currently, `null` in partition specs is recognized as the `"null"` string 
which could lead to incorrect results, for example:
    ```sql
    spark-sql> CREATE TABLE tbl5 (col1 INT, p1 STRING) USING PARQUET 
PARTITIONED BY (p1);
    spark-sql> INSERT INTO TABLE tbl5 PARTITION (p1 = null) SELECT 0;
    spark-sql> SELECT isnull(p1) FROM tbl5;
    false
    ```
    Even we inserted a row to the partition with the `null` value, **the 
resulted table doesn't contain `null`**.
    
    ### Does this PR introduce _any_ user-facing change?
    Yes. After the changes, the example above works as expected:
    ```sql
    spark-sql> SELECT isnull(p1) FROM tbl5;
    true
    ```
    
    ### How was this patch tested?
    1. By running the affected test suites `SQLQuerySuite`, 
`AlterTablePartitionV2SQLSuite` and `v1/ShowPartitionsSuite`.
    2. Compiling by Scala 2.13:
    ```
    $  ./dev/change-scala-version.sh 2.13
    $ ./build/sbt -Pscala-2.13 compile
    ```
    
    Closes #30538 from MaxGekk/partition-spec-value-null.
    
    Authored-by: Max Gekk <max.g...@gmail.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../catalyst/catalog/ExternalCatalogUtils.scala    | 10 +++++++
 .../sql/catalyst/catalog/InMemoryCatalog.scala     |  7 ++++-
 .../sql/catalyst/catalog/SessionCatalog.scala      |  2 +-
 .../spark/sql/catalyst/parser/AstBuilder.scala     |  1 +
 .../spark/sql/execution/datasources/rules.scala    |  3 +-
 .../scala/org/apache/spark/sql/SQLQuerySuite.scala |  9 ++++++
 .../command/AlterTableDropPartitionSuiteBase.scala | 11 ++++++++
 .../command/v1/AlterTableDropPartitionSuite.scala  |  1 +
 .../execution/command/v1/ShowPartitionsSuite.scala | 12 ++++++++
 .../command/v2/AlterTableDropPartitionSuite.scala  |  2 +-
 .../spark/sql/hive/HiveExternalCatalog.scala       | 33 ++++++++++++----------
 .../sql/hive/execution/InsertIntoHiveTable.scala   |  2 ++
 12 files changed, 74 insertions(+), 19 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala
index 00445a1..9d6e0a6 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala
@@ -161,6 +161,10 @@ object ExternalCatalogUtils {
     }
   }
 
+  private def isNullPartitionValue(value: String): Boolean = {
+    value == null || value == DEFAULT_PARTITION_NAME
+  }
+
   /**
    * Returns true if `spec1` is a partial partition spec w.r.t. `spec2`, e.g. 
PARTITION (a=1) is a
    * partial partition spec w.r.t. PARTITION (a=1,b=2).
@@ -169,9 +173,15 @@ object ExternalCatalogUtils {
       spec1: TablePartitionSpec,
       spec2: TablePartitionSpec): Boolean = {
     spec1.forall {
+      case (partitionColumn, value) if isNullPartitionValue(value) =>
+        isNullPartitionValue(spec2(partitionColumn))
       case (partitionColumn, value) => spec2(partitionColumn) == value
     }
   }
+
+  def convertNullPartitionValues(spec: TablePartitionSpec): TablePartitionSpec 
= {
+    spec.mapValues(v => if (v == null) DEFAULT_PARTITION_NAME else v).toMap
+  }
 }
 
 object CatalogUtils {
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
index 64b4a11..0d16f46 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
@@ -541,7 +541,12 @@ class InMemoryCatalog(
 
     listPartitions(db, table, partialSpec).map { partition =>
       partitionColumnNames.map { name =>
-        escapePathName(name) + "=" + escapePathName(partition.spec(name))
+        val partValue = if (partition.spec(name) == null) {
+          DEFAULT_PARTITION_NAME
+        } else {
+          escapePathName(partition.spec(name))
+        }
+        escapePathName(name) + "=" + partValue
       }.mkString("/")
     }.sorted
   }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index 76358ef..0428d12 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -1178,7 +1178,7 @@ class SessionCatalog(
    */
   private def requireNonEmptyValueInPartitionSpec(specs: 
Seq[TablePartitionSpec]): Unit = {
     specs.foreach { s =>
-      if (s.values.exists(_.isEmpty)) {
+      if (s.values.exists(v => v != null && v.isEmpty)) {
         val spec = s.map(p => p._1 + "=" + p._2).mkString("[", ", ", "]")
         throw QueryCompilationErrors.invalidPartitionSpecError(
           s"The spec ($spec) contains an empty partition column value")
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index 395a956..4d028f6 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -511,6 +511,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with 
SQLConfHelper with Logg
    */
   protected def visitStringConstant(ctx: ConstantContext): String = 
withOrigin(ctx) {
     ctx match {
+      case _: NullLiteralContext => null
       case s: StringLiteralContext => createString(s)
       case o => o.getText
     }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
index b9866e4..4fd6684 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
@@ -406,7 +406,8 @@ object PreprocessTableInsertion extends Rule[LogicalPlan] {
       catalogTable.get.tracksPartitionsInCatalog
     if (partitionsTrackedByCatalog && normalizedPartSpec.nonEmpty) {
       // empty partition column value
-      if (normalizedPartSpec.filter(_._2.isDefined).exists(_._2.get.isEmpty)) {
+      if (normalizedPartSpec.map(_._2)
+          .filter(_.isDefined).map(_.get).exists(v => v != null && v.isEmpty)) 
{
         val spec = normalizedPartSpec.map(p => p._1 + "=" + 
p._2).mkString("[", ", ", "]")
         throw new AnalysisException(
           s"Partition spec is invalid. The spec ($spec) contains an empty 
partition column value")
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index 3f55a88..7526bf0 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -3854,6 +3854,15 @@ class SQLQuerySuite extends QueryTest with 
SharedSparkSession with AdaptiveSpark
 
     assert(unions.size == 1)
   }
+
+  test("SPARK-33591: null as a partition value") {
+    val t = "part_table"
+    withTable(t) {
+      sql(s"CREATE TABLE $t (col1 INT, p1 STRING) USING PARQUET PARTITIONED BY 
(p1)")
+      sql(s"INSERT INTO TABLE $t PARTITION (p1 = null) SELECT 0")
+      checkAnswer(sql(s"SELECT * FROM $t"), Row(0, null))
+    }
+  }
 }
 
 case class Foo(bar: Option[String])
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableDropPartitionSuiteBase.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableDropPartitionSuiteBase.scala
index aadcda4..942a3e8 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableDropPartitionSuiteBase.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableDropPartitionSuiteBase.scala
@@ -39,6 +39,7 @@ trait AlterTableDropPartitionSuiteBase extends QueryTest with 
DDLCommandTestUtil
   override val command = "ALTER TABLE .. DROP PARTITION"
 
   protected def notFullPartitionSpecErr: String
+  protected def nullPartitionValue: String
 
   protected def checkDropPartition(
       t: String,
@@ -170,4 +171,14 @@ trait AlterTableDropPartitionSuiteBase extends QueryTest 
with DDLCommandTestUtil
       QueryTest.checkAnswer(sql(s"SELECT * FROM $t"), Seq(Row(1, 1)))
     }
   }
+
+  test("SPARK-33591: null as a partition value") {
+    withNamespaceAndTable("ns", "tbl") { t =>
+      sql(s"CREATE TABLE $t (col1 INT, p1 STRING) $defaultUsing PARTITIONED BY 
(p1)")
+      sql(s"ALTER TABLE $t ADD PARTITION (p1 = null)")
+      checkPartitions(t, Map("p1" -> nullPartitionValue))
+      sql(s"ALTER TABLE $t DROP PARTITION (p1 = null)")
+      checkPartitions(t)
+    }
+  }
 }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/AlterTableDropPartitionSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/AlterTableDropPartitionSuite.scala
index a6490eb..509c0be 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/AlterTableDropPartitionSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/AlterTableDropPartitionSuite.scala
@@ -32,6 +32,7 @@ import org.apache.spark.sql.execution.command
  */
 trait AlterTableDropPartitionSuiteBase extends 
command.AlterTableDropPartitionSuiteBase {
   override protected val notFullPartitionSpecErr = "The following partitions 
not found in table"
+  override protected def nullPartitionValue: String = 
"__HIVE_DEFAULT_PARTITION__"
 
   test("purge partition data") {
     withNamespaceAndTable("ns", "tbl") { t =>
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/ShowPartitionsSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/ShowPartitionsSuite.scala
index e85d62c..a26e297 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/ShowPartitionsSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/ShowPartitionsSuite.scala
@@ -69,6 +69,18 @@ trait ShowPartitionsSuiteBase extends 
command.ShowPartitionsSuiteBase {
       assert(errMsg.contains("'SHOW PARTITIONS' expects a table"))
     }
   }
+
+  test("SPARK-33591: null as a partition value") {
+    val t = "part_table"
+    withTable(t) {
+      sql(s"CREATE TABLE $t (col1 INT, p1 STRING) $defaultUsing PARTITIONED BY 
(p1)")
+      sql(s"INSERT INTO TABLE $t PARTITION (p1 = null) SELECT 0")
+      checkAnswer(sql(s"SHOW PARTITIONS $t"), 
Row("p1=__HIVE_DEFAULT_PARTITION__"))
+      checkAnswer(
+        sql(s"SHOW PARTITIONS $t PARTITION (p1 = null)"),
+        Row("p1=__HIVE_DEFAULT_PARTITION__"))
+    }
+  }
 }
 
 /**
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/AlterTableDropPartitionSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/AlterTableDropPartitionSuite.scala
index d6890d6..3515fa3 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/AlterTableDropPartitionSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/AlterTableDropPartitionSuite.scala
@@ -27,8 +27,8 @@ import org.apache.spark.sql.execution.command
 class AlterTableDropPartitionSuite
   extends command.AlterTableDropPartitionSuiteBase
   with CommandSuiteBase {
-
   override protected val notFullPartitionSpecErr = "Partition spec is invalid"
+  override protected def nullPartitionValue: String = "null"
 
   test("SPARK-33650: drop partition into a table which doesn't support 
partition management") {
     withNamespaceAndTable("ns", "tbl", s"non_part_$catalog") { t =>
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
index b4aa073..eeffe4f 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
@@ -942,9 +942,10 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, 
hadoopConf: Configurat
   // Hive metastore is not case preserving and the partition columns are 
always lower cased. We need
   // to lower case the column names in partition specification before calling 
partition related Hive
   // APIs, to match this behaviour.
-  private def lowerCasePartitionSpec(spec: TablePartitionSpec): 
TablePartitionSpec = {
+  private def toMetaStorePartitionSpec(spec: TablePartitionSpec): 
TablePartitionSpec = {
     // scalastyle:off caselocale
-    spec.map { case (k, v) => k.toLowerCase -> v }
+    val lowNames = spec.map { case (k, v) => k.toLowerCase -> v }
+    ExternalCatalogUtils.convertNullPartitionValues(lowNames)
     // scalastyle:on caselocale
   }
 
@@ -993,8 +994,9 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, 
hadoopConf: Configurat
       }
       p.copy(storage = p.storage.copy(locationUri = Some(partitionPath.toUri)))
     }
-    val lowerCasedParts = partsWithLocation.map(p => p.copy(spec = 
lowerCasePartitionSpec(p.spec)))
-    client.createPartitions(db, table, lowerCasedParts, ignoreIfExists)
+    val metaStoreParts = partsWithLocation
+      .map(p => p.copy(spec = toMetaStorePartitionSpec(p.spec)))
+    client.createPartitions(db, table, metaStoreParts, ignoreIfExists)
   }
 
   override def dropPartitions(
@@ -1006,7 +1008,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, 
hadoopConf: Configurat
       retainData: Boolean): Unit = withClient {
     requireTableExists(db, table)
     client.dropPartitions(
-      db, table, parts.map(lowerCasePartitionSpec), ignoreIfNotExists, purge, 
retainData)
+      db, table, parts.map(toMetaStorePartitionSpec), ignoreIfNotExists, 
purge, retainData)
   }
 
   override def renamePartitions(
@@ -1015,7 +1017,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, 
hadoopConf: Configurat
       specs: Seq[TablePartitionSpec],
       newSpecs: Seq[TablePartitionSpec]): Unit = withClient {
     client.renamePartitions(
-      db, table, specs.map(lowerCasePartitionSpec), 
newSpecs.map(lowerCasePartitionSpec))
+      db, table, specs.map(toMetaStorePartitionSpec), 
newSpecs.map(toMetaStorePartitionSpec))
 
     val tableMeta = getTable(db, table)
     val partitionColumnNames = tableMeta.partitionColumnNames
@@ -1031,7 +1033,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, 
hadoopConf: Configurat
       val fs = tablePath.getFileSystem(hadoopConf)
       val newParts = newSpecs.map { spec =>
         val rightPath = renamePartitionDirectory(fs, tablePath, 
partitionColumnNames, spec)
-        val partition = client.getPartition(db, table, 
lowerCasePartitionSpec(spec))
+        val partition = client.getPartition(db, table, 
toMetaStorePartitionSpec(spec))
         partition.copy(storage = partition.storage.copy(locationUri = 
Some(rightPath.toUri)))
       }
       alterPartitions(db, table, newParts)
@@ -1141,12 +1143,12 @@ private[spark] class HiveExternalCatalog(conf: 
SparkConf, hadoopConf: Configurat
       db: String,
       table: String,
       newParts: Seq[CatalogTablePartition]): Unit = withClient {
-    val lowerCasedParts = newParts.map(p => p.copy(spec = 
lowerCasePartitionSpec(p.spec)))
+    val metaStoreParts = newParts.map(p => p.copy(spec = 
toMetaStorePartitionSpec(p.spec)))
 
     val rawTable = getRawTable(db, table)
 
     // convert partition statistics to properties so that we can persist them 
through hive api
-    val withStatsProps = lowerCasedParts.map { p =>
+    val withStatsProps = metaStoreParts.map { p =>
       if (p.stats.isDefined) {
         val statsProperties = statsToProperties(p.stats.get)
         p.copy(parameters = p.parameters ++ statsProperties)
@@ -1162,7 +1164,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, 
hadoopConf: Configurat
       db: String,
       table: String,
       spec: TablePartitionSpec): CatalogTablePartition = withClient {
-    val part = client.getPartition(db, table, lowerCasePartitionSpec(spec))
+    val part = client.getPartition(db, table, toMetaStorePartitionSpec(spec))
     restorePartitionMetadata(part, getTable(db, table))
   }
 
@@ -1200,7 +1202,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, 
hadoopConf: Configurat
       db: String,
       table: String,
       spec: TablePartitionSpec): Option[CatalogTablePartition] = withClient {
-    client.getPartitionOption(db, table, lowerCasePartitionSpec(spec)).map { 
part =>
+    client.getPartitionOption(db, table, toMetaStorePartitionSpec(spec)).map { 
part =>
       restorePartitionMetadata(part, getTable(db, table))
     }
   }
@@ -1215,7 +1217,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, 
hadoopConf: Configurat
     val catalogTable = getTable(db, table)
     val partColNameMap = 
buildLowerCasePartColNameMap(catalogTable).mapValues(escapePathName)
     val clientPartitionNames =
-      client.getPartitionNames(catalogTable, 
partialSpec.map(lowerCasePartitionSpec))
+      client.getPartitionNames(catalogTable, 
partialSpec.map(toMetaStorePartitionSpec))
     clientPartitionNames.map { partitionPath =>
       val partSpec = PartitioningUtils.parsePathFragmentAsSeq(partitionPath)
       partSpec.map { case (partName, partValue) =>
@@ -1234,11 +1236,12 @@ private[spark] class HiveExternalCatalog(conf: 
SparkConf, hadoopConf: Configurat
       table: String,
       partialSpec: Option[TablePartitionSpec] = None): 
Seq[CatalogTablePartition] = withClient {
     val partColNameMap = buildLowerCasePartColNameMap(getTable(db, table))
-    val res = client.getPartitions(db, table, 
partialSpec.map(lowerCasePartitionSpec)).map { part =>
-      part.copy(spec = restorePartitionSpec(part.spec, partColNameMap))
+    val metaStoreSpec = partialSpec.map(toMetaStorePartitionSpec)
+    val res = client.getPartitions(db, table, metaStoreSpec)
+      .map { part => part.copy(spec = restorePartitionSpec(part.spec, 
partColNameMap))
     }
 
-    partialSpec match {
+    metaStoreSpec match {
       // This might be a bug of Hive: When the partition value inside the 
partial partition spec
       // contains dot, and we ask Hive to list partitions w.r.t. the partial 
partition spec, Hive
       // treats dot as matching any single character and may return more 
partitions than we
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
index 63e4688..bfb24cf 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
@@ -133,6 +133,7 @@ case class InsertIntoHiveTable(
     val numDynamicPartitions = partition.values.count(_.isEmpty)
     val numStaticPartitions = partition.values.count(_.nonEmpty)
     val partitionSpec = partition.map {
+      case (key, Some(null)) => key -> 
ExternalCatalogUtils.DEFAULT_PARTITION_NAME
       case (key, Some(value)) => key -> value
       case (key, None) => key -> ""
     }
@@ -229,6 +230,7 @@ case class InsertIntoHiveTable(
             val caseInsensitiveDpMap = CaseInsensitiveMap(dpMap)
 
             val updatedPartitionSpec = partition.map {
+              case (key, Some(null)) => key -> 
ExternalCatalogUtils.DEFAULT_PARTITION_NAME
               case (key, Some(value)) => key -> value
               case (key, None) if caseInsensitiveDpMap.contains(key) =>
                 key -> caseInsensitiveDpMap(key)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to