Hi all,

This is the bi-weekly Apache Spark digest from the Databricks OSS team.
For each API/configuration/behavior change, there will be an *[API]* tag in
the title.

CORE
<https://github.com/databricks/runtime/wiki/_new#30spark-31559yarn-re-obtain-tokens-at-the-startup-of-am-for-yarn-cluster-mode-if-principal-and-keytab-are-available-14--1>[3.0][SPARK-31559][YARN]
Re-obtain tokens at the startup of AM for yarn cluster mode if principal
and keytab are available (+14, -1)>
<https://github.com/apache/spark/commit/842b1dcdff0ecab4af9f292c2ff7b2b9ae1ac40a>

Re-obtain tokens at the start of AM for yarn cluster mode, if principal and
keytab are available. It basically transfers the credentials from the
original user, so this patch puts the new tokens into credentials from the
original user via overwriting.

Submitter will obtain delegation tokens for yarn-cluster mode, and add
these credentials to the launch context. AM will be launched with these
credentials, and AM and driver are able to leverage these tokens.

In Yarn cluster mode, driver is launched in AM, which in turn initializes
token manager (while initializing SparkContext) and obtain delegation
tokens (+ schedule to renew) if both principal and keytab are available.
<https://github.com/databricks/runtime/wiki/_new#24spark-31399core-support-indylambda-scala-closure-in-closurecleaner-434--47>[2.4][SPARK-31399][CORE]
Support indylambda Scala closure in ClosureCleaner (+434, -47)>
<https://github.com/apache/spark/commit/dc01b7556f74e4a9873ceb1f78bc7df4e2ab4a8a>

There had been previous efforts to extend Spark's ClosureCleaner to support
"indylambda" Scala closures, which is necessary for proper Scala 2.12
support. Most notably the work is done at SPARK-14540
<https://issues.apache.org/jira/browse/SPARK-14540>.

But the previous efforts had missed one import scenario: a Scala closure
declared in a Scala REPL, and it captures the enclosing this -- a REPL line
object.

This PR proposes to enhance Spark's ClosureCleaner to support "indylambda"
style of Scala closures to the same level as the existing implementation
for the old (inner class) style ones. The goal is to reach feature parity
with the support of the old style Scala closures, with as close to
bug-for-bug compatibility as possible.
<https://github.com/databricks/runtime/wiki/_new#30spark-31743core-add-spark_info-metric-into-prometheusresource-2--0>[3.0][SPARK-31743][CORE]
Add spark_info metric into PrometheusResource (+2, -0)>
<https://github.com/apache/spark/commit/64795f9e0c85a999bf808432d0d533843bea0a31>

Add spark_info metric into PrometheusResource.

$ bin/spark-shell --driver-memory 4G -c spark.ui.prometheus.enabled=true

$ curl -s http://localhost:4041/metrics/executors/prometheus/ | head -n1
spark_info{version="3.1.0",
revision="097d5098cca987e5f7bbb8394783c01517ebed0f"} 1.0

<https://github.com/databricks/runtime/wiki/_new#api31spark-20732core-decommission-cache-blocks-to-other-executors-when-an-executor-is-decommissioned-409--13>[API][3.1][SPARK-20732][CORE]
Decommission cache blocks to other executors when an executor is
decommissioned (+409, -13)>
<https://github.com/apache/spark/commit/c560428fe0113f17362bae2b369910049914696f>

After changes in SPARK-20628
<https://issues.apache.org/jira/browse/SPARK-20628>,
CoarseGrainedSchedulerBackend can decommission an executor and stop
assigning new tasks on it. We should also decommission the corresponding
blockmanagers in the same way. i.e. Move the cached RDD blocks from those
executors to other active executors. It introduces 3 new configurations:
Config NameDescriptionDefault Value
spark.storage.decommission.enabled Whether to decommission the block
manager when decommissioning executor false
spark.storage.decommission.maxReplicationFailuresPerBlock Maximum number of
failures which can be handled for the replication of one RDD block when
block manager is decommissioning and trying to move its existing blocks. 3
spark.storage.decommission.replicationReattemptInterval The interval of
time between consecutive cache block replication reattempts happening on
each decommissioning executor (due to storage decommissioning). 30s
<https://github.com/databricks/runtime/wiki/_new#sql>SQL
<https://github.com/databricks/runtime/wiki/_new#api30spark-31365sql-enable-nested-predicate-pushdown-per-data-sources-186--100>[API][3.0][SPARK-31365][SQL]
Enable nested predicate pushdown per data sources (+186, -100)>
<https://github.com/apache/spark/commit/4952f1a03cc48d9f1c3d2539ffa19bf051e398bf>

Replaces a config spark.sql.optimizer.nestedPredicatePushdown.enabled with
spark.sql.optimizer.nestedPredicatePushdown.supportedFileSources which can
configure which v1 data sources are enabled with nested predicate pushdown,
but the previous config is an all or nothing config, and applies on all the
data sources.

In order to not introduce an unexpected API breaking change after enabling
nested predicate pushdown, we'd like to set nested predicate pushdown per
data source.

spark.sql.optimizer.nestedPredicatePushdown.supportedFileSources [Default:
"parquet,orc"]

   - A comma-separated list of data source short names or fully qualified
   data source implementation class names for which Spark tries to push down
   predicates for nested columns and/or names containing dots to data
   sources. This configuration is only effective with file-based data source
   in DSv1. Currently, Parquet implements both optimizations while ORC only
   supports predicates for names containing dots. The other data sources
   don't support this feature yet. So the default value is 'parquet,orc'.

<https://github.com/databricks/runtime/wiki/_new#api30spark-31393sql-show-the-correct-alias-in-schema-for-expression-21--14>[API][3.0][SPARK-31393][SQL]
Show the correct alias in schema for expression (+21, -14)>
<https://github.com/apache/spark/commit/a89006aba03a623960e5c4c6864ca8c899c81db9>

Some alias of expression can not display correctly in schema. This PR will
fix them.

   - TimeWindow
   - MaxBy
   - MinBy
   - UnaryMinus
   - BitwiseCount

<https://github.com/databricks/runtime/wiki/_new#api30spark-31595sql-spark-sql-cli-should-allow-unescaped-quote-mark-in-quoted-string-16--2>[API][3.0][SPARK-31595][SQL]
Spark SQL CLI should allow unescaped quote mark in quoted string (+16, -2)>
<https://github.com/apache/spark/commit/53a9bf8fece7322312cbe93c9224c04f645a0f5e>

Spark SQL CLI cannot handle unescaped quote mark like "'" or '"' correctly.
When there are unmatched quotes in a string, it will not drop off semicolon
as expected.
<https://github.com/databricks/runtime/wiki/_new#api30spark-31647sql-deprecate-sparksqloptimizermetadataonly-configuration-8--3>[API][3.0][SPARK-31647][SQL]
Deprecate 'spark.sql.optimizer.metadataOnly' configuration (+8, -3)>
<https://github.com/apache/spark/commit/5c5dd77d6a29b014b3fe4b4015f5c7199650a378>

Deprecates 'spark.sql.optimizer.metadataOnly' configuration and plan to
remove it in the future releases.

This optimization can cause a potential correctness issue, see also
SPARK-26709 <https://issues.apache.org/jira/browse/SPARK-26709>. Also, it
seems difficult to extend the optimization. Basically you should whitelist
all available functions. It costs some maintenance overhead, see also
SPARK-31590 <https://issues.apache.org/jira/browse/SPARK-31590>.
<https://github.com/databricks/runtime/wiki/_new#24spark-31663sql-grouping-sets-with-having-clause-returns-the-wrong-result-145--39>[2.4][SPARK-31663][SQL]
Grouping sets with having clause returns the wrong result (+145, -39)>
<https://github.com/apache/spark/commit/86bd37f37eb1e534c520dc9a02387debf9fa05a1>

GROUPING SETS with a HAVING clause could return a wrong result when the
condition of HAVING contains the conflicting naming. See the below example:

select sum(a) as b FROM VALUES (1, 10), (2, 20) AS T(a, b) group by
GROUPING SETS ((b), (a, b)) having b > 10

The b in having b > 10 should be resolved as T.b not sum(a), so the right
result should be

+---+
|  b|
+---+
|  2|
|  2|
+---+

instead of an empty result set.

The root cause is similar to SPARK-31519
<https://issues.apache.org/jira/browse/SPARK-31519>. It is caused by the
behavior that we parsed HAVING as Filter(..., Agg(...)) and resolved these
two parts in different rules. The CUBE and ROLLUP have the same issue in
query analysis. This PR is to fix this correctness bug.
<https://github.com/databricks/runtime/wiki/_new#api24spark-31692sql-pass-hadoop-confs--specifed-via-spark-confs-to-urlstreamhandlerfactory-58--3>[API][2.4][SPARK-31692][SQL]
Pass hadoop confs specifed via Spark confs to URLStreamHandlerfactory (+58,
-3)>
<https://github.com/apache/spark/commit/72601460ada41761737f39d5dff8e69444fce2ba>

BEFORE

➜  spark git:(SPARK-31692) ✗ ./bin/spark-shell --conf
spark.hadoop.fs.file.impl=org.apache.hadoop.fs.RawLocalFileSystem

scala> spark.sharedState
res0: org.apache.spark.sql.internal.SharedState =
org.apache.spark.sql.internal.SharedState@5793cd84

scala> new java.net.URL("file:///tmp/1.txt").openConnection.getInputStream
res1: java.io.InputStream =
org.apache.hadoop.fs.ChecksumFileSystem$FSDataBoundedInputStream@22846025

scala> import org.apache.hadoop.fs._import org.apache.hadoop.fs._

scala>  FileSystem.get(new Path("file:///tmp/1.txt").toUri,
spark.sparkContext.hadoopConfiguration)
res2: org.apache.hadoop.fs.FileSystem =
org.apache.hadoop.fs.LocalFileSystem@5a930c03

AFTER

➜  spark git:(SPARK-31692) ✗ ./bin/spark-shell --conf
spark.hadoop.fs.file.impl=org.apache.hadoop.fs.RawLocalFileSystem

scala> spark.sharedState
res0: org.apache.spark.sql.internal.SharedState =
org.apache.spark.sql.internal.SharedState@5c24a636

scala> new java.net.URL("file:///tmp/1.txt").openConnection.getInputStream
res1: java.io.InputStream = org.apache.hadoop.fs.FSDataInputStream@2ba8f528

scala> import org.apache.hadoop.fs._import org.apache.hadoop.fs._

scala>  FileSystem.get(new Path("file:///tmp/1.txt").toUri,
spark.sparkContext.hadoopConfiguration)
res2: org.apache.hadoop.fs.FileSystem = LocalFS

scala>  FileSystem.get(new Path("file:///tmp/1.txt").toUri,
spark.sparkContext.hadoopConfiguration).getClass
res3: Class[_ <: org.apache.hadoop.fs.FileSystem] = class
org.apache.hadoop.fs.RawLocalFileSystem

As shown above, the FileSystem instance via a URL can be different because
the Hadoop configuration was not respected in the registered
FsUrlStreamHandlerFactory that handles the URLs.
<https://github.com/databricks/runtime/wiki/_new#api30spark-30973sql-scripttransformationexec-should-wait-for-the-termination-56--1>[API][3.0][SPARK-30973][SQL]
ScriptTransformationExec should wait for the termination (+56, -1)>
<https://github.com/apache/spark/commit/ddbce4edee6d4de30e6900bc0f03728a989aef0a>

Before the fix, when users use the TRANSFORM with a Python script that
contains a parser error, Spark successfully finishes and returns an empty
result set. After the fix, Spark fails and shows the exception properly.

For example,

# encoding: utf8import unknow_moduleimport sys
for line in sys.stdin:
    print line

spark.range(100*100).toDF("index").createOrReplaceTempView("test")
spark.sql("select TRANSFORM(index) USING 'python error_python.py' as
new_index from test").collect.foreach(println)

<https://github.com/databricks/runtime/wiki/_new#api30spark-31405sql-fail-by-default-when-readingwriting-legacy-datetime-values-fromto-parquetavro-files-519--319>[API][3.0][SPARK-31405][SQL]
Fail by default when reading/writing legacy datetime values from/to
Parquet/Avro files (+519, -319)>
<https://github.com/apache/spark/commit/fd2d55c9919ece5463377bc6f45f2cdb8bf90515>

When reading/writing datetime values that before the rebase switch day,
from/to Avro/Parquet files, fail by default and ask users to set a config
to explicitly do rebase or not.

After changes, users will see an error when reading/writing dates before
1582-10-15 or timestamps before 1900-01-01 from/to Parquet/Avro files, with
an error message to ask setting a config.
<https://github.com/databricks/runtime/wiki/_new#api30spark-31620sql-fix-reference-binding-failure-in-case-of-an-final-agg-contains-subquery-69--6>[API][3.0][SPARK-31620][SQL]
Fix reference binding failure in case of an final agg contains subquery
(+69, -6)>
<https://github.com/apache/spark/commit/d8b001fa872f735df4344321c33780a892da9b41>

When planning aggregates, the partial aggregate uses agg fucs'
inputAggBufferAttributes as its output, see
https://github.com/apache/spark/blob/v3.0.0-rc1/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggUtils.scala#L105

For the final HashAggregateExec, we need to bind the
DeclarativeAggregate.mergeExpressions with the output of the partial
aggregate operator, see
https://github.com/apache/spark/blob/v3.0.0-rc1/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala#L348

This is usually fine. However, if we copy the agg func somehow after agg
planning, like PlanSubqueries, the DeclarativeAggregate will be replaced by
a new instance with new inputAggBufferAttributes and mergeExpressions. Then
we can't bind the mergeExpressions with the output of the partial aggregate
operator, as it uses the inputAggBufferAttributes of the original
DeclarativeAggregate before copy.

Instead of using child.output directly, we should use
inputAggBufferAttributes from the current agg expression for Final and
PartialMerge aggregates to bind references for their mergeExpression.
<https://github.com/databricks/runtime/wiki/_new#api30spark-31701rsql-bump-up-the-minimum-arrow-version-as-0151-in-sparkr-5--10>[API][3.0][SPARK-31701][R][SQL]
Bump up the minimum Arrow version as 0.15.1 in SparkR (+5, -10)>
<https://github.com/apache/spark/commit/e1315cd65631823123af0d14771b0f699809251b>

It will reduce the maintenance overhead to match the Arrow versions, and
minimize the supported range. SparkR Arrow optimization is experimental yet.
<https://github.com/databricks/runtime/wiki/_new#api30spark-31707sql-revert-spark-30098-use-default-datasource-as-provider-for-create-table-syntax-91--167>[API][3.0][SPARK-31707][SQL]
Revert SPARK-30098 Use default datasource as provider for CREATE TABLE
syntax (+91, -167)>
<https://github.com/apache/spark/commit/d2bec5e265e0aa4fa527c3f43cfe738cdbdc4598>

Revert SPARK-30098 <https://issues.apache.org/jira/browse/SPARK-30098>,
which brought confusion and frustration on using create table DDL query,
and we agreed about the bad effect on the change. Please read the discussion
thread
<http://apache-spark-developers-list.1001551.n3.nabble.com/DISCUSS-Resolve-ambiguous-parser-rule-between-two-quot-create-table-quot-s-td29051i20.html>
for
more details.
<https://github.com/databricks/runtime/wiki/_new#31spark-31684sql-overwrite-partition-failed-with-wrong-fs-when-the-target-partition-is-not-belong-to-the-filesystem-as-same-as-the-table-209--4>[3.1][SPARK-31684][SQL]
Overwrite partition failed with 'WRONG FS' when the target partition is not
belong to the filesystem as same as the table (+209, -4)>
<https://github.com/apache/spark/commit/1f29f1ba5879c324b0caab70d6962eb3ba24549c>

With SPARK-18107 <https://issues.apache.org/jira/browse/SPARK-18107>, we
will disable the underlying replace(overwrite) and instead do delete in
Spark side and only do copy in Hive side to bypass the performance issue,
see also HIVE-11940 <https://issues.apache.org/jira/browse/HIVE-11940>.

Conditionally, if the table location and partition location do not belong
to the same FileSystem, we should not disable hive overwrite. Otherwise,
Hive will use the FileSystem instance belong to the table location to copy
files, which will fail in FileSystem#checkPath
https://github.com/apache/hive/blob/rel/release-2.3.7/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java#L1657

In this PR, for Hive 2.0.0 and onwards, as HIVE-11940
<https://issues.apache.org/jira/browse/HIVE-11940> has been fixed, and
there is no performance issue anymore. We should leave the overwrite logic
to hive to avoid failure in FileSystem#checkPath
<https://github.com/databricks/runtime/wiki/_new#ml>ML
<https://github.com/databricks/runtime/wiki/_new#api30spark-31610spark-31668ml-address-hashingtf-savingloading-bug-and-expose-hashfunc-property-in-hashingtf-35--9>[API][3.0][SPARK-31610][SPARK-31668][ML]
Address hashingTF saving&loading bug and expose hashFunc property in
HashingTF (+35, -9)>
<https://github.com/apache/spark/commit/e248bc7af6086cde7dd89a51459ae6a221a600c8>

   - Address hashingTF saving&loading bug
   - Exposes hashFunc property in HashingTF

<https://github.com/databricks/runtime/wiki/_new#api31spark-30642mlpyspark-linearsvc-blockify-input-vectors-498--91>[API][3.1][SPARK-30642][ML][PYSPARK]
LinearSVC blockify input vectors (+498, -91)>
<https://github.com/apache/spark/commit/ebdf41dd698ce138d07f63b1fa3ffbcc392e7fff>

Adds a new param blockSize to LinearSVC to:

   1. reduce RAM to persist traing dataset (save about 40% RAM)
   2. use Level-2 BLAS routines (4x ~ 5x faster on dataset epsilon).

If blockSize==1, keep original behavior, and if blockSize>1, stack input
vectors to blocks (like ALS/MLP) and standardize the input outside of
optimization procedure.
<https://github.com/databricks/runtime/wiki/_new#api31spark-30659mlpyspark-logisticregression-blockify-input-vectors-856--362>[API][3.1][SPARK-30659][ML][PYSPARK]
LogisticRegression blockify input vectors (+856, -362)>
<https://github.com/apache/spark/commit/052ff49acd5820dc8415d4b9d14c4aadc4438df4>

Reorganizes the fit method in LR to several blocks (createModel,
createBounds, createOptimizer, createInitCoefWithInterceptMatrix) and adds
a new param blockSize to:

   1. reduce RAM to persist traing dataset (save about 40% RAM)
   2. to use Level-2 BLAS routines (4x ~ 5x faster).

if blockSize==1, keep original behavior, code path trainOnRows, and if
blockSize>1, standardize and stack input vectors to blocks (like ALS/MLP),
code path trainOnBlocks.
<https://github.com/databricks/runtime/wiki/_new#31spark-30660mlpyspark-linearregression-blockify-input-vectors-598--211>[3.1][SPARK-30660][ML][PYSPARK]
LinearRegression blockify input vectors (+598, -211)>
<https://github.com/apache/spark/commit/97332f26bf58a0626f3df78caaba6bbc79f27a11>

Adds a new param blockSize to LinearRegression to:

   1. reduce RAM to persist traing dataset (save about 40% RAM)
   2. to use Level-2 BLAS routines (up to 6X(squaredError)~12X(huber)
   speedup).

If blockSize==1, keep original behavior, and if blockSize>1, stack input
vectors to blocks (like ALS/MLP) and standardize the input outside of
optimization procedure.
<https://github.com/databricks/runtime/wiki/_new#api31spark-30699mlpyspark-gmm-blockify-input-vectors-325--76>[API][3.1][SPARK-30699][ML][PYSPARK]
GMM blockify input vectors (+325, -76)>
<https://github.com/apache/spark/commit/e7fa778dc7a695d3b1426de6f98a401f2fb98f39>

Adds a new param blockSize to GMM to obtain performance gain on dense
dataset HIGGS:

   1. save about 45% RAM
   2. 3X faster with openBLAS

If blockSize==1, keep original behavior, code path trainOnRows, and if
blockSize>1, standardize and stack input vectors to blocks (like ALS/MLP),
code path trainOnBlocks.
<https://github.com/databricks/runtime/wiki/_new#api31spark-31127ml-implement-abstract-selector-562--886>[API][3.1][SPARK-31127][ML]
Implement abstract Selector (+562, -886)>
<https://github.com/apache/spark/commit/f05560bf501aef407de38ef6bd70ffb2863e2fa8>

Implements abstract Selector to reuse codes.

Puts the common code among ANOVASelector, ChiSqSelector, FValueSelector and
VarianceThresholdSelector to Selector.
<https://github.com/databricks/runtime/wiki/_new#api31spark-31609mlpyspark-add-variancethresholdselector-to-pyspark-142--0>[API][3.1][SPARK-31609][ML][PYSPARK]
Add VarianceThresholdSelector to PySpark (+142, -0)>
<https://github.com/apache/spark/commit/09ece50799222d577009a2bbd480304d1ae1e14e>

Adds VarianceThresholdSelector to PySpark to make it consistent between
Scala and Python.
<https://github.com/databricks/runtime/wiki/_new#api31spark-31652mlpyspark-add-anovaselector-and-fvalueselector-to-pyspark-306--85>[API][3.1][SPARK-31652][ML][PYSPARK]
Add ANOVASelector and FValueSelector to PySpark (+306, -85)>
<https://github.com/apache/spark/commit/18d2ba53e46a0e1685d51c17bc9ed830fb33de09>

Adds ANOVASelector and FValueSelector to PySpark since they have been
implemented in Scala.
<https://github.com/databricks/runtime/wiki/_new#api31spark-31656mlpyspark-aft-blockify-input-vectors-277--79>[API][3.1][SPARK-31656][ML][PYSPARK]
AFT blockify input vectors (+277, -79)>
<https://github.com/apache/spark/commit/bb9b50c2172bc1aa7d261626aa74f9be6d1d5e79>

Adds a new param blockSize to AFT to obtain performance gain on dense
datasets, such as epsilon:

   1. reduce RAM to persist traing dataset (save about 40% RAM)
   2. use Level-2 BLAS routines (~10X speedup)

If blockSize==1, keep original behavior, and if blockSize>1, stack input
vectors to blocks (like ALS/MLP) and standardize the input outside of
optimization procedure.
<https://github.com/databricks/runtime/wiki/_new#api31spark-31667mlpyspark-python-side-flatten-the-result-dataframe-of-anovatestchisqtestfvaluetest-48--12>[API][3.1][SPARK-31667][ML][PYSPARK]
Python side flatten the result dataframe of ANOVATest/ChisqTest/FValueTest
(+48, -12)>
<https://github.com/apache/spark/commit/7a670b5a0a44dbfb8a9504e9d45366da6deb3c27>

Adds Python version of

Since("3.1.0")
def test(
    dataset: DataFrame,
    featuresCol: String,
    labelCol: String,
    flatten: Boolean): DataFrame

to make it consistent between Scala and Python.
<https://github.com/databricks/runtime/wiki/_new#24spark-31676ml-quantilediscretizer-raise-error-parameter-splits-given-invalid-value-splits-array-includes--00-and-00-30--0>[2.4][SPARK-31676][ML]
QuantileDiscretizer raise error parameter splits given invalid value
(splits array includes -0.0 and 0.0) (+30, -0)>
<https://github.com/apache/spark/commit/b2300fca1e1a22d74c6eeda37942920a6c6299ff>

In QuantileDiscretizer.getDistinctSplits, before invoking distinct,
normalize all -0.0 and 0.0 to be 0.0

    for (i <- 0 until splits.length) {
      if (splits(i) == -0.0) {
        splits(i) = 0.0
      }
    }

<https://github.com/databricks/runtime/wiki/_new#30spark-31681mlpyspark-python-multiclass-logistic-regression-evaluate-should-return-logisticregressionsummary-9--2>[3.0][SPARK-31681][ML][PYSPARK]
Python multiclass logistic regression evaluate should return
LogisticRegressionSummary (+9, -2)>
<https://github.com/apache/spark/commit/e10516ae63cfc58f2d493e4d3f19940d45c8f033>

Return LogisticRegressionSummary for multiclass logistic regression
evaluate in PySpark.
<https://github.com/databricks/runtime/wiki/_new#other>OTHER
<https://github.com/databricks/runtime/wiki/_new#24spark-31655build-upgrade-snappy-java-to-1175-4--4>[2.4][SPARK-31655][BUILD]
Upgrade snappy-java to 1.1.7.5 (+4, -4)>
<https://github.com/apache/spark/commit/0d9faf602ec1c381c56926c432268869ee1df19c>

Reply via email to