Hi all, This is the bi-weekly Apache Spark digest from the Databricks OSS team. For each API/configuration/behavior change, there will be an *[API]* tag in the title.
CORE <https://github.com/databricks/runtime/wiki/_new#30spark-31559yarn-re-obtain-tokens-at-the-startup-of-am-for-yarn-cluster-mode-if-principal-and-keytab-are-available-14--1>[3.0][SPARK-31559][YARN] Re-obtain tokens at the startup of AM for yarn cluster mode if principal and keytab are available (+14, -1)> <https://github.com/apache/spark/commit/842b1dcdff0ecab4af9f292c2ff7b2b9ae1ac40a> Re-obtain tokens at the start of AM for yarn cluster mode, if principal and keytab are available. It basically transfers the credentials from the original user, so this patch puts the new tokens into credentials from the original user via overwriting. Submitter will obtain delegation tokens for yarn-cluster mode, and add these credentials to the launch context. AM will be launched with these credentials, and AM and driver are able to leverage these tokens. In Yarn cluster mode, driver is launched in AM, which in turn initializes token manager (while initializing SparkContext) and obtain delegation tokens (+ schedule to renew) if both principal and keytab are available. <https://github.com/databricks/runtime/wiki/_new#24spark-31399core-support-indylambda-scala-closure-in-closurecleaner-434--47>[2.4][SPARK-31399][CORE] Support indylambda Scala closure in ClosureCleaner (+434, -47)> <https://github.com/apache/spark/commit/dc01b7556f74e4a9873ceb1f78bc7df4e2ab4a8a> There had been previous efforts to extend Spark's ClosureCleaner to support "indylambda" Scala closures, which is necessary for proper Scala 2.12 support. Most notably the work is done at SPARK-14540 <https://issues.apache.org/jira/browse/SPARK-14540>. But the previous efforts had missed one import scenario: a Scala closure declared in a Scala REPL, and it captures the enclosing this -- a REPL line object. This PR proposes to enhance Spark's ClosureCleaner to support "indylambda" style of Scala closures to the same level as the existing implementation for the old (inner class) style ones. The goal is to reach feature parity with the support of the old style Scala closures, with as close to bug-for-bug compatibility as possible. <https://github.com/databricks/runtime/wiki/_new#30spark-31743core-add-spark_info-metric-into-prometheusresource-2--0>[3.0][SPARK-31743][CORE] Add spark_info metric into PrometheusResource (+2, -0)> <https://github.com/apache/spark/commit/64795f9e0c85a999bf808432d0d533843bea0a31> Add spark_info metric into PrometheusResource. $ bin/spark-shell --driver-memory 4G -c spark.ui.prometheus.enabled=true $ curl -s http://localhost:4041/metrics/executors/prometheus/ | head -n1 spark_info{version="3.1.0", revision="097d5098cca987e5f7bbb8394783c01517ebed0f"} 1.0 <https://github.com/databricks/runtime/wiki/_new#api31spark-20732core-decommission-cache-blocks-to-other-executors-when-an-executor-is-decommissioned-409--13>[API][3.1][SPARK-20732][CORE] Decommission cache blocks to other executors when an executor is decommissioned (+409, -13)> <https://github.com/apache/spark/commit/c560428fe0113f17362bae2b369910049914696f> After changes in SPARK-20628 <https://issues.apache.org/jira/browse/SPARK-20628>, CoarseGrainedSchedulerBackend can decommission an executor and stop assigning new tasks on it. We should also decommission the corresponding blockmanagers in the same way. i.e. Move the cached RDD blocks from those executors to other active executors. It introduces 3 new configurations: Config NameDescriptionDefault Value spark.storage.decommission.enabled Whether to decommission the block manager when decommissioning executor false spark.storage.decommission.maxReplicationFailuresPerBlock Maximum number of failures which can be handled for the replication of one RDD block when block manager is decommissioning and trying to move its existing blocks. 3 spark.storage.decommission.replicationReattemptInterval The interval of time between consecutive cache block replication reattempts happening on each decommissioning executor (due to storage decommissioning). 30s <https://github.com/databricks/runtime/wiki/_new#sql>SQL <https://github.com/databricks/runtime/wiki/_new#api30spark-31365sql-enable-nested-predicate-pushdown-per-data-sources-186--100>[API][3.0][SPARK-31365][SQL] Enable nested predicate pushdown per data sources (+186, -100)> <https://github.com/apache/spark/commit/4952f1a03cc48d9f1c3d2539ffa19bf051e398bf> Replaces a config spark.sql.optimizer.nestedPredicatePushdown.enabled with spark.sql.optimizer.nestedPredicatePushdown.supportedFileSources which can configure which v1 data sources are enabled with nested predicate pushdown, but the previous config is an all or nothing config, and applies on all the data sources. In order to not introduce an unexpected API breaking change after enabling nested predicate pushdown, we'd like to set nested predicate pushdown per data source. spark.sql.optimizer.nestedPredicatePushdown.supportedFileSources [Default: "parquet,orc"] - A comma-separated list of data source short names or fully qualified data source implementation class names for which Spark tries to push down predicates for nested columns and/or names containing dots to data sources. This configuration is only effective with file-based data source in DSv1. Currently, Parquet implements both optimizations while ORC only supports predicates for names containing dots. The other data sources don't support this feature yet. So the default value is 'parquet,orc'. <https://github.com/databricks/runtime/wiki/_new#api30spark-31393sql-show-the-correct-alias-in-schema-for-expression-21--14>[API][3.0][SPARK-31393][SQL] Show the correct alias in schema for expression (+21, -14)> <https://github.com/apache/spark/commit/a89006aba03a623960e5c4c6864ca8c899c81db9> Some alias of expression can not display correctly in schema. This PR will fix them. - TimeWindow - MaxBy - MinBy - UnaryMinus - BitwiseCount <https://github.com/databricks/runtime/wiki/_new#api30spark-31595sql-spark-sql-cli-should-allow-unescaped-quote-mark-in-quoted-string-16--2>[API][3.0][SPARK-31595][SQL] Spark SQL CLI should allow unescaped quote mark in quoted string (+16, -2)> <https://github.com/apache/spark/commit/53a9bf8fece7322312cbe93c9224c04f645a0f5e> Spark SQL CLI cannot handle unescaped quote mark like "'" or '"' correctly. When there are unmatched quotes in a string, it will not drop off semicolon as expected. <https://github.com/databricks/runtime/wiki/_new#api30spark-31647sql-deprecate-sparksqloptimizermetadataonly-configuration-8--3>[API][3.0][SPARK-31647][SQL] Deprecate 'spark.sql.optimizer.metadataOnly' configuration (+8, -3)> <https://github.com/apache/spark/commit/5c5dd77d6a29b014b3fe4b4015f5c7199650a378> Deprecates 'spark.sql.optimizer.metadataOnly' configuration and plan to remove it in the future releases. This optimization can cause a potential correctness issue, see also SPARK-26709 <https://issues.apache.org/jira/browse/SPARK-26709>. Also, it seems difficult to extend the optimization. Basically you should whitelist all available functions. It costs some maintenance overhead, see also SPARK-31590 <https://issues.apache.org/jira/browse/SPARK-31590>. <https://github.com/databricks/runtime/wiki/_new#24spark-31663sql-grouping-sets-with-having-clause-returns-the-wrong-result-145--39>[2.4][SPARK-31663][SQL] Grouping sets with having clause returns the wrong result (+145, -39)> <https://github.com/apache/spark/commit/86bd37f37eb1e534c520dc9a02387debf9fa05a1> GROUPING SETS with a HAVING clause could return a wrong result when the condition of HAVING contains the conflicting naming. See the below example: select sum(a) as b FROM VALUES (1, 10), (2, 20) AS T(a, b) group by GROUPING SETS ((b), (a, b)) having b > 10 The b in having b > 10 should be resolved as T.b not sum(a), so the right result should be +---+ | b| +---+ | 2| | 2| +---+ instead of an empty result set. The root cause is similar to SPARK-31519 <https://issues.apache.org/jira/browse/SPARK-31519>. It is caused by the behavior that we parsed HAVING as Filter(..., Agg(...)) and resolved these two parts in different rules. The CUBE and ROLLUP have the same issue in query analysis. This PR is to fix this correctness bug. <https://github.com/databricks/runtime/wiki/_new#api24spark-31692sql-pass-hadoop-confs--specifed-via-spark-confs-to-urlstreamhandlerfactory-58--3>[API][2.4][SPARK-31692][SQL] Pass hadoop confs specifed via Spark confs to URLStreamHandlerfactory (+58, -3)> <https://github.com/apache/spark/commit/72601460ada41761737f39d5dff8e69444fce2ba> BEFORE ➜ spark git:(SPARK-31692) ✗ ./bin/spark-shell --conf spark.hadoop.fs.file.impl=org.apache.hadoop.fs.RawLocalFileSystem scala> spark.sharedState res0: org.apache.spark.sql.internal.SharedState = org.apache.spark.sql.internal.SharedState@5793cd84 scala> new java.net.URL("file:///tmp/1.txt").openConnection.getInputStream res1: java.io.InputStream = org.apache.hadoop.fs.ChecksumFileSystem$FSDataBoundedInputStream@22846025 scala> import org.apache.hadoop.fs._import org.apache.hadoop.fs._ scala> FileSystem.get(new Path("file:///tmp/1.txt").toUri, spark.sparkContext.hadoopConfiguration) res2: org.apache.hadoop.fs.FileSystem = org.apache.hadoop.fs.LocalFileSystem@5a930c03 AFTER ➜ spark git:(SPARK-31692) ✗ ./bin/spark-shell --conf spark.hadoop.fs.file.impl=org.apache.hadoop.fs.RawLocalFileSystem scala> spark.sharedState res0: org.apache.spark.sql.internal.SharedState = org.apache.spark.sql.internal.SharedState@5c24a636 scala> new java.net.URL("file:///tmp/1.txt").openConnection.getInputStream res1: java.io.InputStream = org.apache.hadoop.fs.FSDataInputStream@2ba8f528 scala> import org.apache.hadoop.fs._import org.apache.hadoop.fs._ scala> FileSystem.get(new Path("file:///tmp/1.txt").toUri, spark.sparkContext.hadoopConfiguration) res2: org.apache.hadoop.fs.FileSystem = LocalFS scala> FileSystem.get(new Path("file:///tmp/1.txt").toUri, spark.sparkContext.hadoopConfiguration).getClass res3: Class[_ <: org.apache.hadoop.fs.FileSystem] = class org.apache.hadoop.fs.RawLocalFileSystem As shown above, the FileSystem instance via a URL can be different because the Hadoop configuration was not respected in the registered FsUrlStreamHandlerFactory that handles the URLs. <https://github.com/databricks/runtime/wiki/_new#api30spark-30973sql-scripttransformationexec-should-wait-for-the-termination-56--1>[API][3.0][SPARK-30973][SQL] ScriptTransformationExec should wait for the termination (+56, -1)> <https://github.com/apache/spark/commit/ddbce4edee6d4de30e6900bc0f03728a989aef0a> Before the fix, when users use the TRANSFORM with a Python script that contains a parser error, Spark successfully finishes and returns an empty result set. After the fix, Spark fails and shows the exception properly. For example, # encoding: utf8import unknow_moduleimport sys for line in sys.stdin: print line spark.range(100*100).toDF("index").createOrReplaceTempView("test") spark.sql("select TRANSFORM(index) USING 'python error_python.py' as new_index from test").collect.foreach(println) <https://github.com/databricks/runtime/wiki/_new#api30spark-31405sql-fail-by-default-when-readingwriting-legacy-datetime-values-fromto-parquetavro-files-519--319>[API][3.0][SPARK-31405][SQL] Fail by default when reading/writing legacy datetime values from/to Parquet/Avro files (+519, -319)> <https://github.com/apache/spark/commit/fd2d55c9919ece5463377bc6f45f2cdb8bf90515> When reading/writing datetime values that before the rebase switch day, from/to Avro/Parquet files, fail by default and ask users to set a config to explicitly do rebase or not. After changes, users will see an error when reading/writing dates before 1582-10-15 or timestamps before 1900-01-01 from/to Parquet/Avro files, with an error message to ask setting a config. <https://github.com/databricks/runtime/wiki/_new#api30spark-31620sql-fix-reference-binding-failure-in-case-of-an-final-agg-contains-subquery-69--6>[API][3.0][SPARK-31620][SQL] Fix reference binding failure in case of an final agg contains subquery (+69, -6)> <https://github.com/apache/spark/commit/d8b001fa872f735df4344321c33780a892da9b41> When planning aggregates, the partial aggregate uses agg fucs' inputAggBufferAttributes as its output, see https://github.com/apache/spark/blob/v3.0.0-rc1/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggUtils.scala#L105 For the final HashAggregateExec, we need to bind the DeclarativeAggregate.mergeExpressions with the output of the partial aggregate operator, see https://github.com/apache/spark/blob/v3.0.0-rc1/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala#L348 This is usually fine. However, if we copy the agg func somehow after agg planning, like PlanSubqueries, the DeclarativeAggregate will be replaced by a new instance with new inputAggBufferAttributes and mergeExpressions. Then we can't bind the mergeExpressions with the output of the partial aggregate operator, as it uses the inputAggBufferAttributes of the original DeclarativeAggregate before copy. Instead of using child.output directly, we should use inputAggBufferAttributes from the current agg expression for Final and PartialMerge aggregates to bind references for their mergeExpression. <https://github.com/databricks/runtime/wiki/_new#api30spark-31701rsql-bump-up-the-minimum-arrow-version-as-0151-in-sparkr-5--10>[API][3.0][SPARK-31701][R][SQL] Bump up the minimum Arrow version as 0.15.1 in SparkR (+5, -10)> <https://github.com/apache/spark/commit/e1315cd65631823123af0d14771b0f699809251b> It will reduce the maintenance overhead to match the Arrow versions, and minimize the supported range. SparkR Arrow optimization is experimental yet. <https://github.com/databricks/runtime/wiki/_new#api30spark-31707sql-revert-spark-30098-use-default-datasource-as-provider-for-create-table-syntax-91--167>[API][3.0][SPARK-31707][SQL] Revert SPARK-30098 Use default datasource as provider for CREATE TABLE syntax (+91, -167)> <https://github.com/apache/spark/commit/d2bec5e265e0aa4fa527c3f43cfe738cdbdc4598> Revert SPARK-30098 <https://issues.apache.org/jira/browse/SPARK-30098>, which brought confusion and frustration on using create table DDL query, and we agreed about the bad effect on the change. Please read the discussion thread <http://apache-spark-developers-list.1001551.n3.nabble.com/DISCUSS-Resolve-ambiguous-parser-rule-between-two-quot-create-table-quot-s-td29051i20.html> for more details. <https://github.com/databricks/runtime/wiki/_new#31spark-31684sql-overwrite-partition-failed-with-wrong-fs-when-the-target-partition-is-not-belong-to-the-filesystem-as-same-as-the-table-209--4>[3.1][SPARK-31684][SQL] Overwrite partition failed with 'WRONG FS' when the target partition is not belong to the filesystem as same as the table (+209, -4)> <https://github.com/apache/spark/commit/1f29f1ba5879c324b0caab70d6962eb3ba24549c> With SPARK-18107 <https://issues.apache.org/jira/browse/SPARK-18107>, we will disable the underlying replace(overwrite) and instead do delete in Spark side and only do copy in Hive side to bypass the performance issue, see also HIVE-11940 <https://issues.apache.org/jira/browse/HIVE-11940>. Conditionally, if the table location and partition location do not belong to the same FileSystem, we should not disable hive overwrite. Otherwise, Hive will use the FileSystem instance belong to the table location to copy files, which will fail in FileSystem#checkPath https://github.com/apache/hive/blob/rel/release-2.3.7/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java#L1657 In this PR, for Hive 2.0.0 and onwards, as HIVE-11940 <https://issues.apache.org/jira/browse/HIVE-11940> has been fixed, and there is no performance issue anymore. We should leave the overwrite logic to hive to avoid failure in FileSystem#checkPath <https://github.com/databricks/runtime/wiki/_new#ml>ML <https://github.com/databricks/runtime/wiki/_new#api30spark-31610spark-31668ml-address-hashingtf-savingloading-bug-and-expose-hashfunc-property-in-hashingtf-35--9>[API][3.0][SPARK-31610][SPARK-31668][ML] Address hashingTF saving&loading bug and expose hashFunc property in HashingTF (+35, -9)> <https://github.com/apache/spark/commit/e248bc7af6086cde7dd89a51459ae6a221a600c8> - Address hashingTF saving&loading bug - Exposes hashFunc property in HashingTF <https://github.com/databricks/runtime/wiki/_new#api31spark-30642mlpyspark-linearsvc-blockify-input-vectors-498--91>[API][3.1][SPARK-30642][ML][PYSPARK] LinearSVC blockify input vectors (+498, -91)> <https://github.com/apache/spark/commit/ebdf41dd698ce138d07f63b1fa3ffbcc392e7fff> Adds a new param blockSize to LinearSVC to: 1. reduce RAM to persist traing dataset (save about 40% RAM) 2. use Level-2 BLAS routines (4x ~ 5x faster on dataset epsilon). If blockSize==1, keep original behavior, and if blockSize>1, stack input vectors to blocks (like ALS/MLP) and standardize the input outside of optimization procedure. <https://github.com/databricks/runtime/wiki/_new#api31spark-30659mlpyspark-logisticregression-blockify-input-vectors-856--362>[API][3.1][SPARK-30659][ML][PYSPARK] LogisticRegression blockify input vectors (+856, -362)> <https://github.com/apache/spark/commit/052ff49acd5820dc8415d4b9d14c4aadc4438df4> Reorganizes the fit method in LR to several blocks (createModel, createBounds, createOptimizer, createInitCoefWithInterceptMatrix) and adds a new param blockSize to: 1. reduce RAM to persist traing dataset (save about 40% RAM) 2. to use Level-2 BLAS routines (4x ~ 5x faster). if blockSize==1, keep original behavior, code path trainOnRows, and if blockSize>1, standardize and stack input vectors to blocks (like ALS/MLP), code path trainOnBlocks. <https://github.com/databricks/runtime/wiki/_new#31spark-30660mlpyspark-linearregression-blockify-input-vectors-598--211>[3.1][SPARK-30660][ML][PYSPARK] LinearRegression blockify input vectors (+598, -211)> <https://github.com/apache/spark/commit/97332f26bf58a0626f3df78caaba6bbc79f27a11> Adds a new param blockSize to LinearRegression to: 1. reduce RAM to persist traing dataset (save about 40% RAM) 2. to use Level-2 BLAS routines (up to 6X(squaredError)~12X(huber) speedup). If blockSize==1, keep original behavior, and if blockSize>1, stack input vectors to blocks (like ALS/MLP) and standardize the input outside of optimization procedure. <https://github.com/databricks/runtime/wiki/_new#api31spark-30699mlpyspark-gmm-blockify-input-vectors-325--76>[API][3.1][SPARK-30699][ML][PYSPARK] GMM blockify input vectors (+325, -76)> <https://github.com/apache/spark/commit/e7fa778dc7a695d3b1426de6f98a401f2fb98f39> Adds a new param blockSize to GMM to obtain performance gain on dense dataset HIGGS: 1. save about 45% RAM 2. 3X faster with openBLAS If blockSize==1, keep original behavior, code path trainOnRows, and if blockSize>1, standardize and stack input vectors to blocks (like ALS/MLP), code path trainOnBlocks. <https://github.com/databricks/runtime/wiki/_new#api31spark-31127ml-implement-abstract-selector-562--886>[API][3.1][SPARK-31127][ML] Implement abstract Selector (+562, -886)> <https://github.com/apache/spark/commit/f05560bf501aef407de38ef6bd70ffb2863e2fa8> Implements abstract Selector to reuse codes. Puts the common code among ANOVASelector, ChiSqSelector, FValueSelector and VarianceThresholdSelector to Selector. <https://github.com/databricks/runtime/wiki/_new#api31spark-31609mlpyspark-add-variancethresholdselector-to-pyspark-142--0>[API][3.1][SPARK-31609][ML][PYSPARK] Add VarianceThresholdSelector to PySpark (+142, -0)> <https://github.com/apache/spark/commit/09ece50799222d577009a2bbd480304d1ae1e14e> Adds VarianceThresholdSelector to PySpark to make it consistent between Scala and Python. <https://github.com/databricks/runtime/wiki/_new#api31spark-31652mlpyspark-add-anovaselector-and-fvalueselector-to-pyspark-306--85>[API][3.1][SPARK-31652][ML][PYSPARK] Add ANOVASelector and FValueSelector to PySpark (+306, -85)> <https://github.com/apache/spark/commit/18d2ba53e46a0e1685d51c17bc9ed830fb33de09> Adds ANOVASelector and FValueSelector to PySpark since they have been implemented in Scala. <https://github.com/databricks/runtime/wiki/_new#api31spark-31656mlpyspark-aft-blockify-input-vectors-277--79>[API][3.1][SPARK-31656][ML][PYSPARK] AFT blockify input vectors (+277, -79)> <https://github.com/apache/spark/commit/bb9b50c2172bc1aa7d261626aa74f9be6d1d5e79> Adds a new param blockSize to AFT to obtain performance gain on dense datasets, such as epsilon: 1. reduce RAM to persist traing dataset (save about 40% RAM) 2. use Level-2 BLAS routines (~10X speedup) If blockSize==1, keep original behavior, and if blockSize>1, stack input vectors to blocks (like ALS/MLP) and standardize the input outside of optimization procedure. <https://github.com/databricks/runtime/wiki/_new#api31spark-31667mlpyspark-python-side-flatten-the-result-dataframe-of-anovatestchisqtestfvaluetest-48--12>[API][3.1][SPARK-31667][ML][PYSPARK] Python side flatten the result dataframe of ANOVATest/ChisqTest/FValueTest (+48, -12)> <https://github.com/apache/spark/commit/7a670b5a0a44dbfb8a9504e9d45366da6deb3c27> Adds Python version of Since("3.1.0") def test( dataset: DataFrame, featuresCol: String, labelCol: String, flatten: Boolean): DataFrame to make it consistent between Scala and Python. <https://github.com/databricks/runtime/wiki/_new#24spark-31676ml-quantilediscretizer-raise-error-parameter-splits-given-invalid-value-splits-array-includes--00-and-00-30--0>[2.4][SPARK-31676][ML] QuantileDiscretizer raise error parameter splits given invalid value (splits array includes -0.0 and 0.0) (+30, -0)> <https://github.com/apache/spark/commit/b2300fca1e1a22d74c6eeda37942920a6c6299ff> In QuantileDiscretizer.getDistinctSplits, before invoking distinct, normalize all -0.0 and 0.0 to be 0.0 for (i <- 0 until splits.length) { if (splits(i) == -0.0) { splits(i) = 0.0 } } <https://github.com/databricks/runtime/wiki/_new#30spark-31681mlpyspark-python-multiclass-logistic-regression-evaluate-should-return-logisticregressionsummary-9--2>[3.0][SPARK-31681][ML][PYSPARK] Python multiclass logistic regression evaluate should return LogisticRegressionSummary (+9, -2)> <https://github.com/apache/spark/commit/e10516ae63cfc58f2d493e4d3f19940d45c8f033> Return LogisticRegressionSummary for multiclass logistic regression evaluate in PySpark. <https://github.com/databricks/runtime/wiki/_new#other>OTHER <https://github.com/databricks/runtime/wiki/_new#24spark-31655build-upgrade-snappy-java-to-1175-4--4>[2.4][SPARK-31655][BUILD] Upgrade snappy-java to 1.1.7.5 (+4, -4)> <https://github.com/apache/spark/commit/0d9faf602ec1c381c56926c432268869ee1df19c>