[ https://issues.apache.org/jira/browse/SPARK-51119?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Szehon Ho updated SPARK-51119: ------------------------------ Description: Summary: Spark executors unnecessary contacts catalogs when resolving EXISTS_DEFAULTS (used for default values for existing data) Detailed explanation: The code path for default values first runs an analysis of the user-provided CURRENT_DEFAULT(to evaluate functions, etc) , and uses the result sql for EXISTS_DEFAULT. EXISTS_DEFAULT is saved in order to avoid having to rewrite existing data using backfill to fill this value in the files. When reading existing files, Spark then attempts to resolve the EXISTS_DEFAULT metadata and use the value for null values it finds in that column. But this step redundantly runs all the analyzer rules again and finish analysis rules, some of which contact the catalog unnecessarily. This may cause exceptions if the executors are not configured properly to reach the catalog, such as: {code} Caused by: org.apache.spark.SparkException: Failed during instantiating constructor for catalog 'spark_catalog': org.apache.spark.sql.delta.catalog.DeltaCatalog. at org.apache.spark.sql.errors.QueryExecutionErrors$.failedToInstantiateConstructorForCatalogError(QueryExecutionErrors.scala:2400) at org.apache.spark.sql.connector.catalog.Catalogs$.load(Catalogs.scala:84) at org.apache.spark.sql.connector.catalog.CatalogManager.loadV2SessionCatalog(CatalogManager.scala:72) at org.apache.spark.sql.connector.catalog.CatalogManager.$anonfun$v2SessionCatalog$2(CatalogManager.scala:94) at scala.collection.mutable.HashMap.getOrElseUpdate(HashMap.scala:86) at org.apache.spark.sql.connector.catalog.CatalogManager.$anonfun$v2SessionCatalog$1(CatalogManager.scala:94) at scala.Option.map(Option.scala:230) at org.apache.spark.sql.connector.catalog.CatalogManager.v2SessionCatalog(CatalogManager.scala:93) at org.apache.spark.sql.connector.catalog.CatalogManager.catalog(CatalogManager.scala:55) at org.apache.spark.sql.connector.catalog.CatalogManager.currentCatalog(CatalogManager.scala:130) at org.apache.spark.sql.connector.catalog.CatalogManager.currentNamespace(CatalogManager.scala:101) at org.apache.spark.sql.catalyst.optimizer.ReplaceCurrentLike.apply(finishAnalysis.scala:172) at org.apache.spark.sql.catalyst.optimizer.ReplaceCurrentLike.apply(finishAnalysis.scala:169) at org.apache.spark.sql.catalyst.optimizer.Optimizer$FinishAnalysis$.$anonfun$apply$1(Optimizer.scala:502) at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126) at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122) at scala.collection.immutable.List.foldLeft(List.scala:91) at org.apache.spark.sql.catalyst.optimizer.Optimizer$FinishAnalysis$.apply(Optimizer.scala:502) at org.apache.spark.sql.catalyst.util.ResolveDefaultColumns$.analyze(ResolveDefaultColumnsUtil.scala:301) at org.apache.spark.sql.catalyst.util.ResolveDefaultColumns$.analyze(ResolveDefaultColumnsUtil.scala:266) at org.apache.spark.sql.catalyst.util.ResolveDefaultColumns$.$anonfun$getExistenceDefaultValues$2(ResolveDefaultColumnsUtil.scala:427) at scala.Option.map(Option.scala:230) at org.apache.spark.sql.catalyst.util.ResolveDefaultColumns$.$anonfun$getExistenceDefaultValues$1(ResolveDefaultColumnsUtil.scala:425) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286) at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36) at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198) at scala.collection.TraversableLike.map(TraversableLike.scala:286) at scala.collection.TraversableLike.map$(TraversableLike.scala:279) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:198) at org.apache.spark.sql.catalyst.util.ResolveDefaultColumns$.getExistenceDefaultValues(ResolveDefaultColumnsUtil.scala:423) at org.apache.spark.sql.catalyst.util.ResolveDefaultColumns$.$anonfun$existenceDefaultValues$2(ResolveDefaultColumnsUtil.scala:498) at scala.Option.getOrElse(Option.scala:189) at org.apache.spark.sql.catalyst.util.ResolveDefaultColumns$.existenceDefaultValues(ResolveDefaultColumnsUtil.scala:496) at org.apache.spark.sql.catalyst.util.ResolveDefaultColumns.existenceDefaultValues(ResolveDefaultColumnsUtil.scala) at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.initBatch(VectorizedParquetRecordReader.java:350) at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.initBatch(VectorizedParquetRecordReader.java:373) at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anon$1.$anonfun$apply$5(ParquetFileFormat.scala:441) at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1561) at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anon$1.apply(ParquetFileFormat.scala:428) at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anon$1.apply(ParquetFileFormat.scala:258) at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1$$anon$2.getNext(FileScanRDD.scala:639) ... 21 more Caused by: java.lang.IllegalStateException: No active or default Spark session found {code} was: Summary: Spark executors unnecessary contacts catalogs when resolving EXISTS_DEFAULTS (used for default values for existing data) Detailed explanation: The code path for default values first runs an analysis of the user-provided CURRENT_DEFAULT(to evaluate functions, etc) , and uses the result sql for EXISTS_DEFAULT. EXISTS_DEFAULT is saved in order to avoid having to rewrite existing data using backfill to fill this value in the files. When reading existing files, Spark then attempts to resolve the EXISTS_DEFAULT metadata and use the value for null values it finds in that column. But this step redundantly runs all the analyzer rules again and finish analysis rules, some of which contact the catalog unnecessarily. This may cause exceptions if the executors are not configured properly to reach the catalog, such as: ``` Caused by: org.apache.spark.SparkException: Failed during instantiating constructor for catalog 'spark_catalog': org.apache.spark.sql.delta.catalog.DeltaCatalog. at org.apache.spark.sql.errors.QueryExecutionErrors$.failedToInstantiateConstructorForCatalogError(QueryExecutionErrors.scala:2400) at org.apache.spark.sql.connector.catalog.Catalogs$.load(Catalogs.scala:84) at org.apache.spark.sql.connector.catalog.CatalogManager.loadV2SessionCatalog(CatalogManager.scala:72) at org.apache.spark.sql.connector.catalog.CatalogManager.$anonfun$v2SessionCatalog$2(CatalogManager.scala:94) at scala.collection.mutable.HashMap.getOrElseUpdate(HashMap.scala:86) at org.apache.spark.sql.connector.catalog.CatalogManager.$anonfun$v2SessionCatalog$1(CatalogManager.scala:94) at scala.Option.map(Option.scala:230) at org.apache.spark.sql.connector.catalog.CatalogManager.v2SessionCatalog(CatalogManager.scala:93) at org.apache.spark.sql.connector.catalog.CatalogManager.catalog(CatalogManager.scala:55) at org.apache.spark.sql.connector.catalog.CatalogManager.currentCatalog(CatalogManager.scala:130) at org.apache.spark.sql.connector.catalog.CatalogManager.currentNamespace(CatalogManager.scala:101) at org.apache.spark.sql.catalyst.optimizer.ReplaceCurrentLike.apply(finishAnalysis.scala:172) at org.apache.spark.sql.catalyst.optimizer.ReplaceCurrentLike.apply(finishAnalysis.scala:169) at org.apache.spark.sql.catalyst.optimizer.Optimizer$FinishAnalysis$.$anonfun$apply$1(Optimizer.scala:502) at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126) at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122) at scala.collection.immutable.List.foldLeft(List.scala:91) at org.apache.spark.sql.catalyst.optimizer.Optimizer$FinishAnalysis$.apply(Optimizer.scala:502) at org.apache.spark.sql.catalyst.util.ResolveDefaultColumns$.analyze(ResolveDefaultColumnsUtil.scala:301) at org.apache.spark.sql.catalyst.util.ResolveDefaultColumns$.analyze(ResolveDefaultColumnsUtil.scala:266) at org.apache.spark.sql.catalyst.util.ResolveDefaultColumns$.$anonfun$getExistenceDefaultValues$2(ResolveDefaultColumnsUtil.scala:427) at scala.Option.map(Option.scala:230) at org.apache.spark.sql.catalyst.util.ResolveDefaultColumns$.$anonfun$getExistenceDefaultValues$1(ResolveDefaultColumnsUtil.scala:425) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286) at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36) at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198) at scala.collection.TraversableLike.map(TraversableLike.scala:286) at scala.collection.TraversableLike.map$(TraversableLike.scala:279) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:198) at org.apache.spark.sql.catalyst.util.ResolveDefaultColumns$.getExistenceDefaultValues(ResolveDefaultColumnsUtil.scala:423) at org.apache.spark.sql.catalyst.util.ResolveDefaultColumns$.$anonfun$existenceDefaultValues$2(ResolveDefaultColumnsUtil.scala:498) at scala.Option.getOrElse(Option.scala:189) at org.apache.spark.sql.catalyst.util.ResolveDefaultColumns$.existenceDefaultValues(ResolveDefaultColumnsUtil.scala:496) at org.apache.spark.sql.catalyst.util.ResolveDefaultColumns.existenceDefaultValues(ResolveDefaultColumnsUtil.scala) at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.initBatch(VectorizedParquetRecordReader.java:350) at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.initBatch(VectorizedParquetRecordReader.java:373) at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anon$1.$anonfun$apply$5(ParquetFileFormat.scala:441) at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1561) at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anon$1.apply(ParquetFileFormat.scala:428) at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anon$1.apply(ParquetFileFormat.scala:258) at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1$$anon$2.getNext(FileScanRDD.scala:639) ... 21 more Caused by: java.lang.IllegalStateException: No active or default Spark session found ``` > Readers on executors resolving EXISTS_DEFAULT should not call catalogs > ---------------------------------------------------------------------- > > Key: SPARK-51119 > URL: https://issues.apache.org/jira/browse/SPARK-51119 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 4.0.0 > Reporter: Szehon Ho > Priority: Major > > Summary: Spark executors unnecessary contacts catalogs when resolving > EXISTS_DEFAULTS (used for default values for existing data) > Detailed explanation: The code path for default values first runs an > analysis of the user-provided CURRENT_DEFAULT(to evaluate functions, etc) , > and uses the result sql for EXISTS_DEFAULT. EXISTS_DEFAULT is saved in order > to avoid having to rewrite existing data using backfill to fill this value in > the files. > When reading existing files, Spark then attempts to resolve the > EXISTS_DEFAULT metadata and use the value for null values it finds in that > column. But this step redundantly runs all the analyzer rules again and > finish analysis rules, some of which contact the catalog unnecessarily. > This may cause exceptions if the executors are not configured properly to > reach the catalog, such as: > {code} > Caused by: org.apache.spark.SparkException: Failed during instantiating > constructor for catalog 'spark_catalog': > org.apache.spark.sql.delta.catalog.DeltaCatalog. at > org.apache.spark.sql.errors.QueryExecutionErrors$.failedToInstantiateConstructorForCatalogError(QueryExecutionErrors.scala:2400) > at org.apache.spark.sql.connector.catalog.Catalogs$.load(Catalogs.scala:84) > at > org.apache.spark.sql.connector.catalog.CatalogManager.loadV2SessionCatalog(CatalogManager.scala:72) > at > org.apache.spark.sql.connector.catalog.CatalogManager.$anonfun$v2SessionCatalog$2(CatalogManager.scala:94) > at scala.collection.mutable.HashMap.getOrElseUpdate(HashMap.scala:86) at > org.apache.spark.sql.connector.catalog.CatalogManager.$anonfun$v2SessionCatalog$1(CatalogManager.scala:94) > at scala.Option.map(Option.scala:230) at > org.apache.spark.sql.connector.catalog.CatalogManager.v2SessionCatalog(CatalogManager.scala:93) > at > org.apache.spark.sql.connector.catalog.CatalogManager.catalog(CatalogManager.scala:55) > at > org.apache.spark.sql.connector.catalog.CatalogManager.currentCatalog(CatalogManager.scala:130) > at > org.apache.spark.sql.connector.catalog.CatalogManager.currentNamespace(CatalogManager.scala:101) > at > org.apache.spark.sql.catalyst.optimizer.ReplaceCurrentLike.apply(finishAnalysis.scala:172) > at > org.apache.spark.sql.catalyst.optimizer.ReplaceCurrentLike.apply(finishAnalysis.scala:169) > at > org.apache.spark.sql.catalyst.optimizer.Optimizer$FinishAnalysis$.$anonfun$apply$1(Optimizer.scala:502) > at > scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126) at > scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122) > at scala.collection.immutable.List.foldLeft(List.scala:91) at > org.apache.spark.sql.catalyst.optimizer.Optimizer$FinishAnalysis$.apply(Optimizer.scala:502) > at > org.apache.spark.sql.catalyst.util.ResolveDefaultColumns$.analyze(ResolveDefaultColumnsUtil.scala:301) > at > org.apache.spark.sql.catalyst.util.ResolveDefaultColumns$.analyze(ResolveDefaultColumnsUtil.scala:266) > at > org.apache.spark.sql.catalyst.util.ResolveDefaultColumns$.$anonfun$getExistenceDefaultValues$2(ResolveDefaultColumnsUtil.scala:427) > at scala.Option.map(Option.scala:230) at > org.apache.spark.sql.catalyst.util.ResolveDefaultColumns$.$anonfun$getExistenceDefaultValues$1(ResolveDefaultColumnsUtil.scala:425) > at > scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286) at > scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36) at > scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33) > at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198) at > scala.collection.TraversableLike.map(TraversableLike.scala:286) at > scala.collection.TraversableLike.map$(TraversableLike.scala:279) at > scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:198) at > org.apache.spark.sql.catalyst.util.ResolveDefaultColumns$.getExistenceDefaultValues(ResolveDefaultColumnsUtil.scala:423) > at > org.apache.spark.sql.catalyst.util.ResolveDefaultColumns$.$anonfun$existenceDefaultValues$2(ResolveDefaultColumnsUtil.scala:498) > at scala.Option.getOrElse(Option.scala:189) at > org.apache.spark.sql.catalyst.util.ResolveDefaultColumns$.existenceDefaultValues(ResolveDefaultColumnsUtil.scala:496) > at > org.apache.spark.sql.catalyst.util.ResolveDefaultColumns.existenceDefaultValues(ResolveDefaultColumnsUtil.scala) > at > org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.initBatch(VectorizedParquetRecordReader.java:350) > at > org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.initBatch(VectorizedParquetRecordReader.java:373) > at > org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anon$1.$anonfun$apply$5(ParquetFileFormat.scala:441) > at > org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1561) > at > org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anon$1.apply(ParquetFileFormat.scala:428) > at > org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anon$1.apply(ParquetFileFormat.scala:258) > at > org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1$$anon$2.getNext(FileScanRDD.scala:639) > ... 21 more Caused by: java.lang.IllegalStateException: No active or default > Spark session found > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org