[ 
https://issues.apache.org/jira/browse/SPARK-51119?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Szehon Ho updated SPARK-51119:
------------------------------
    Description: 
Summary: Spark executors unnecessary contacts catalogs when resolving 
EXISTS_DEFAULTS (used for default values for existing data)

Detailed explanation:  The code path for default values first runs an analysis 
of the user-provided CURRENT_DEFAULT(to evaluate functions, etc) , and uses the 
result sql for EXISTS_DEFAULT.  EXISTS_DEFAULT is saved in order to avoid 
having to rewrite existing data using backfill to fill this value in the files.

When reading existing files, Spark then attempts to resolve the EXISTS_DEFAULT 
metadata and use the value for null values it finds in that column.  But this 
step redundantly runs all the analyzer rules again and finish analysis rules, 
some of which contact the catalog unnecessarily.

This may cause exceptions if the executors are not configured properly to reach 
the catalog, such as:

{code}
Caused by: org.apache.spark.SparkException: Failed during instantiating 
constructor for catalog 'spark_catalog': 
org.apache.spark.sql.delta.catalog.DeltaCatalog. at 
org.apache.spark.sql.errors.QueryExecutionErrors$.failedToInstantiateConstructorForCatalogError(QueryExecutionErrors.scala:2400)
 at org.apache.spark.sql.connector.catalog.Catalogs$.load(Catalogs.scala:84) at 
org.apache.spark.sql.connector.catalog.CatalogManager.loadV2SessionCatalog(CatalogManager.scala:72)
 at 
org.apache.spark.sql.connector.catalog.CatalogManager.$anonfun$v2SessionCatalog$2(CatalogManager.scala:94)
 at scala.collection.mutable.HashMap.getOrElseUpdate(HashMap.scala:86) at 
org.apache.spark.sql.connector.catalog.CatalogManager.$anonfun$v2SessionCatalog$1(CatalogManager.scala:94)
 at scala.Option.map(Option.scala:230) at 
org.apache.spark.sql.connector.catalog.CatalogManager.v2SessionCatalog(CatalogManager.scala:93)
 at 
org.apache.spark.sql.connector.catalog.CatalogManager.catalog(CatalogManager.scala:55)
 at 
org.apache.spark.sql.connector.catalog.CatalogManager.currentCatalog(CatalogManager.scala:130)
 at 
org.apache.spark.sql.connector.catalog.CatalogManager.currentNamespace(CatalogManager.scala:101)
 at 
org.apache.spark.sql.catalyst.optimizer.ReplaceCurrentLike.apply(finishAnalysis.scala:172)
 at 
org.apache.spark.sql.catalyst.optimizer.ReplaceCurrentLike.apply(finishAnalysis.scala:169)
 at 
org.apache.spark.sql.catalyst.optimizer.Optimizer$FinishAnalysis$.$anonfun$apply$1(Optimizer.scala:502)
 at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126) 
at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122) 
at scala.collection.immutable.List.foldLeft(List.scala:91) at 
org.apache.spark.sql.catalyst.optimizer.Optimizer$FinishAnalysis$.apply(Optimizer.scala:502)
 at 
org.apache.spark.sql.catalyst.util.ResolveDefaultColumns$.analyze(ResolveDefaultColumnsUtil.scala:301)
 at 
org.apache.spark.sql.catalyst.util.ResolveDefaultColumns$.analyze(ResolveDefaultColumnsUtil.scala:266)
 at 
org.apache.spark.sql.catalyst.util.ResolveDefaultColumns$.$anonfun$getExistenceDefaultValues$2(ResolveDefaultColumnsUtil.scala:427)
 at scala.Option.map(Option.scala:230) at 
org.apache.spark.sql.catalyst.util.ResolveDefaultColumns$.$anonfun$getExistenceDefaultValues$1(ResolveDefaultColumnsUtil.scala:425)
 at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286) 
at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36) 
at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33) 
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198) at 
scala.collection.TraversableLike.map(TraversableLike.scala:286) at 
scala.collection.TraversableLike.map$(TraversableLike.scala:279) at 
scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:198) at 
org.apache.spark.sql.catalyst.util.ResolveDefaultColumns$.getExistenceDefaultValues(ResolveDefaultColumnsUtil.scala:423)
 at 
org.apache.spark.sql.catalyst.util.ResolveDefaultColumns$.$anonfun$existenceDefaultValues$2(ResolveDefaultColumnsUtil.scala:498)
 at scala.Option.getOrElse(Option.scala:189) at 
org.apache.spark.sql.catalyst.util.ResolveDefaultColumns$.existenceDefaultValues(ResolveDefaultColumnsUtil.scala:496)
 at 
org.apache.spark.sql.catalyst.util.ResolveDefaultColumns.existenceDefaultValues(ResolveDefaultColumnsUtil.scala)
 at 
org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.initBatch(VectorizedParquetRecordReader.java:350)
 at 
org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.initBatch(VectorizedParquetRecordReader.java:373)
 at 
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anon$1.$anonfun$apply$5(ParquetFileFormat.scala:441)
 at 
org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1561)
 at 
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anon$1.apply(ParquetFileFormat.scala:428)
 at 
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anon$1.apply(ParquetFileFormat.scala:258)
 at 
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1$$anon$2.getNext(FileScanRDD.scala:639)
 ... 21 more Caused by: java.lang.IllegalStateException: No active or default 
Spark session found
{code}

  was:
Summary: Spark executors unnecessary contacts catalogs when resolving 
EXISTS_DEFAULTS (used for default values for existing data)

Detailed explanation:  The code path for default values first runs an analysis 
of the user-provided CURRENT_DEFAULT(to evaluate functions, etc) , and uses the 
result sql for EXISTS_DEFAULT.  EXISTS_DEFAULT is saved in order to avoid 
having to rewrite existing data using backfill to fill this value in the files.

When reading existing files, Spark then attempts to resolve the EXISTS_DEFAULT 
metadata and use the value for null values it finds in that column.  But this 
step redundantly runs all the analyzer rules again and finish analysis rules, 
some of which contact the catalog unnecessarily.

This may cause exceptions if the executors are not configured properly to reach 
the catalog, such as:

```
Caused by: org.apache.spark.SparkException: Failed during instantiating 
constructor for catalog 'spark_catalog': 
org.apache.spark.sql.delta.catalog.DeltaCatalog. at 
org.apache.spark.sql.errors.QueryExecutionErrors$.failedToInstantiateConstructorForCatalogError(QueryExecutionErrors.scala:2400)
 at org.apache.spark.sql.connector.catalog.Catalogs$.load(Catalogs.scala:84) at 
org.apache.spark.sql.connector.catalog.CatalogManager.loadV2SessionCatalog(CatalogManager.scala:72)
 at 
org.apache.spark.sql.connector.catalog.CatalogManager.$anonfun$v2SessionCatalog$2(CatalogManager.scala:94)
 at scala.collection.mutable.HashMap.getOrElseUpdate(HashMap.scala:86) at 
org.apache.spark.sql.connector.catalog.CatalogManager.$anonfun$v2SessionCatalog$1(CatalogManager.scala:94)
 at scala.Option.map(Option.scala:230) at 
org.apache.spark.sql.connector.catalog.CatalogManager.v2SessionCatalog(CatalogManager.scala:93)
 at 
org.apache.spark.sql.connector.catalog.CatalogManager.catalog(CatalogManager.scala:55)
 at 
org.apache.spark.sql.connector.catalog.CatalogManager.currentCatalog(CatalogManager.scala:130)
 at 
org.apache.spark.sql.connector.catalog.CatalogManager.currentNamespace(CatalogManager.scala:101)
 at 
org.apache.spark.sql.catalyst.optimizer.ReplaceCurrentLike.apply(finishAnalysis.scala:172)
 at 
org.apache.spark.sql.catalyst.optimizer.ReplaceCurrentLike.apply(finishAnalysis.scala:169)
 at 
org.apache.spark.sql.catalyst.optimizer.Optimizer$FinishAnalysis$.$anonfun$apply$1(Optimizer.scala:502)
 at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126) 
at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122) 
at scala.collection.immutable.List.foldLeft(List.scala:91) at 
org.apache.spark.sql.catalyst.optimizer.Optimizer$FinishAnalysis$.apply(Optimizer.scala:502)
 at 
org.apache.spark.sql.catalyst.util.ResolveDefaultColumns$.analyze(ResolveDefaultColumnsUtil.scala:301)
 at 
org.apache.spark.sql.catalyst.util.ResolveDefaultColumns$.analyze(ResolveDefaultColumnsUtil.scala:266)
 at 
org.apache.spark.sql.catalyst.util.ResolveDefaultColumns$.$anonfun$getExistenceDefaultValues$2(ResolveDefaultColumnsUtil.scala:427)
 at scala.Option.map(Option.scala:230) at 
org.apache.spark.sql.catalyst.util.ResolveDefaultColumns$.$anonfun$getExistenceDefaultValues$1(ResolveDefaultColumnsUtil.scala:425)
 at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286) 
at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36) 
at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33) 
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198) at 
scala.collection.TraversableLike.map(TraversableLike.scala:286) at 
scala.collection.TraversableLike.map$(TraversableLike.scala:279) at 
scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:198) at 
org.apache.spark.sql.catalyst.util.ResolveDefaultColumns$.getExistenceDefaultValues(ResolveDefaultColumnsUtil.scala:423)
 at 
org.apache.spark.sql.catalyst.util.ResolveDefaultColumns$.$anonfun$existenceDefaultValues$2(ResolveDefaultColumnsUtil.scala:498)
 at scala.Option.getOrElse(Option.scala:189) at 
org.apache.spark.sql.catalyst.util.ResolveDefaultColumns$.existenceDefaultValues(ResolveDefaultColumnsUtil.scala:496)
 at 
org.apache.spark.sql.catalyst.util.ResolveDefaultColumns.existenceDefaultValues(ResolveDefaultColumnsUtil.scala)
 at 
org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.initBatch(VectorizedParquetRecordReader.java:350)
 at 
org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.initBatch(VectorizedParquetRecordReader.java:373)
 at 
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anon$1.$anonfun$apply$5(ParquetFileFormat.scala:441)
 at 
org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1561)
 at 
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anon$1.apply(ParquetFileFormat.scala:428)
 at 
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anon$1.apply(ParquetFileFormat.scala:258)
 at 
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1$$anon$2.getNext(FileScanRDD.scala:639)
 ... 21 more Caused by: java.lang.IllegalStateException: No active or default 
Spark session found
```


> Readers on executors resolving EXISTS_DEFAULT should not call catalogs
> ----------------------------------------------------------------------
>
>                 Key: SPARK-51119
>                 URL: https://issues.apache.org/jira/browse/SPARK-51119
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 4.0.0
>            Reporter: Szehon Ho
>            Priority: Major
>
> Summary: Spark executors unnecessary contacts catalogs when resolving 
> EXISTS_DEFAULTS (used for default values for existing data)
> Detailed explanation:  The code path for default values first runs an 
> analysis of the user-provided CURRENT_DEFAULT(to evaluate functions, etc) , 
> and uses the result sql for EXISTS_DEFAULT.  EXISTS_DEFAULT is saved in order 
> to avoid having to rewrite existing data using backfill to fill this value in 
> the files.
> When reading existing files, Spark then attempts to resolve the 
> EXISTS_DEFAULT metadata and use the value for null values it finds in that 
> column.  But this step redundantly runs all the analyzer rules again and 
> finish analysis rules, some of which contact the catalog unnecessarily.
> This may cause exceptions if the executors are not configured properly to 
> reach the catalog, such as:
> {code}
> Caused by: org.apache.spark.SparkException: Failed during instantiating 
> constructor for catalog 'spark_catalog': 
> org.apache.spark.sql.delta.catalog.DeltaCatalog. at 
> org.apache.spark.sql.errors.QueryExecutionErrors$.failedToInstantiateConstructorForCatalogError(QueryExecutionErrors.scala:2400)
>  at org.apache.spark.sql.connector.catalog.Catalogs$.load(Catalogs.scala:84) 
> at 
> org.apache.spark.sql.connector.catalog.CatalogManager.loadV2SessionCatalog(CatalogManager.scala:72)
>  at 
> org.apache.spark.sql.connector.catalog.CatalogManager.$anonfun$v2SessionCatalog$2(CatalogManager.scala:94)
>  at scala.collection.mutable.HashMap.getOrElseUpdate(HashMap.scala:86) at 
> org.apache.spark.sql.connector.catalog.CatalogManager.$anonfun$v2SessionCatalog$1(CatalogManager.scala:94)
>  at scala.Option.map(Option.scala:230) at 
> org.apache.spark.sql.connector.catalog.CatalogManager.v2SessionCatalog(CatalogManager.scala:93)
>  at 
> org.apache.spark.sql.connector.catalog.CatalogManager.catalog(CatalogManager.scala:55)
>  at 
> org.apache.spark.sql.connector.catalog.CatalogManager.currentCatalog(CatalogManager.scala:130)
>  at 
> org.apache.spark.sql.connector.catalog.CatalogManager.currentNamespace(CatalogManager.scala:101)
>  at 
> org.apache.spark.sql.catalyst.optimizer.ReplaceCurrentLike.apply(finishAnalysis.scala:172)
>  at 
> org.apache.spark.sql.catalyst.optimizer.ReplaceCurrentLike.apply(finishAnalysis.scala:169)
>  at 
> org.apache.spark.sql.catalyst.optimizer.Optimizer$FinishAnalysis$.$anonfun$apply$1(Optimizer.scala:502)
>  at 
> scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126) at 
> scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122) 
> at scala.collection.immutable.List.foldLeft(List.scala:91) at 
> org.apache.spark.sql.catalyst.optimizer.Optimizer$FinishAnalysis$.apply(Optimizer.scala:502)
>  at 
> org.apache.spark.sql.catalyst.util.ResolveDefaultColumns$.analyze(ResolveDefaultColumnsUtil.scala:301)
>  at 
> org.apache.spark.sql.catalyst.util.ResolveDefaultColumns$.analyze(ResolveDefaultColumnsUtil.scala:266)
>  at 
> org.apache.spark.sql.catalyst.util.ResolveDefaultColumns$.$anonfun$getExistenceDefaultValues$2(ResolveDefaultColumnsUtil.scala:427)
>  at scala.Option.map(Option.scala:230) at 
> org.apache.spark.sql.catalyst.util.ResolveDefaultColumns$.$anonfun$getExistenceDefaultValues$1(ResolveDefaultColumnsUtil.scala:425)
>  at 
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286) at 
> scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36) at 
> scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33) 
> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198) at 
> scala.collection.TraversableLike.map(TraversableLike.scala:286) at 
> scala.collection.TraversableLike.map$(TraversableLike.scala:279) at 
> scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:198) at 
> org.apache.spark.sql.catalyst.util.ResolveDefaultColumns$.getExistenceDefaultValues(ResolveDefaultColumnsUtil.scala:423)
>  at 
> org.apache.spark.sql.catalyst.util.ResolveDefaultColumns$.$anonfun$existenceDefaultValues$2(ResolveDefaultColumnsUtil.scala:498)
>  at scala.Option.getOrElse(Option.scala:189) at 
> org.apache.spark.sql.catalyst.util.ResolveDefaultColumns$.existenceDefaultValues(ResolveDefaultColumnsUtil.scala:496)
>  at 
> org.apache.spark.sql.catalyst.util.ResolveDefaultColumns.existenceDefaultValues(ResolveDefaultColumnsUtil.scala)
>  at 
> org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.initBatch(VectorizedParquetRecordReader.java:350)
>  at 
> org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.initBatch(VectorizedParquetRecordReader.java:373)
>  at 
> org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anon$1.$anonfun$apply$5(ParquetFileFormat.scala:441)
>  at 
> org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1561)
>  at 
> org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anon$1.apply(ParquetFileFormat.scala:428)
>  at 
> org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anon$1.apply(ParquetFileFormat.scala:258)
>  at 
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1$$anon$2.getNext(FileScanRDD.scala:639)
>  ... 21 more Caused by: java.lang.IllegalStateException: No active or default 
> Spark session found
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to