[ https://issues.apache.org/jira/browse/SPARK-21885?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16149997#comment-16149997 ]
liupengcheng edited comment on SPARK-21885 at 9/1/17 1:25 PM: -------------------------------------------------------------- [~viirya] I think it's necessary, consider this senario, you have a timer job, and your schema may varies with time, you need to read the history data with old schema, but you are not expected to use `INFER_AND_SAVE` to change the current schema. What's more, event if use `INFER_AND_SAVE`, it seems like that it will still infer schema. although there is some cache, but i think it's not enough, the first execution of any query for each session would be very slow. {code:java} private def inferIfNeeded( relation: MetastoreRelation, options: Map[String, String], fileFormat: FileFormat, fileIndexOpt: Option[FileIndex] = None): (StructType, CatalogTable) = { val inferenceMode = sparkSession.sessionState.conf.caseSensitiveInferenceMode bq. val shouldInfer = (inferenceMode != {quote}NEVER_INFER{quote}) && !relation.catalogTable.schemaPreservesCase val tableName = relation.catalogTable.identifier.unquotedString if (shouldInfer) { logInfo(s"Inferring case-sensitive schema for table $tableName (inference mode: " + s"$inferenceMode)") val fileIndex = fileIndexOpt.getOrElse { val rootPath = new Path(relation.catalogTable.location) new InMemoryFileIndex(sparkSession, Seq(rootPath), options, None) } val inferredSchema = fileFormat .inferSchema( sparkSession, options, fileIndex.listFiles(Nil).flatMap(_.files)) .map(mergeWithMetastoreSchema(relation.catalogTable.schema, _)) inferredSchema match { case Some(schema) => if (inferenceMode == INFER_AND_SAVE) { updateCatalogSchema(relation.catalogTable.identifier, schema) } (schema, relation.catalogTable.copy(schema = schema)) case None => logWarning(s"Unable to infer schema for table $tableName from file format " + s"$fileFormat (inference mode: $inferenceMode). Using metastore schema.") (relation.catalogTable.schema, relation.catalogTable) } } else { (relation.catalogTable.schema, relation.catalogTable) } } {code} was (Author: liupengcheng): [~viirya] I think it's necessary, consider this senario, you have a timer job, and your schema may varies with time, you need to read the history data with old schema, but you are not expected to use `INFER_AND_SAVE` to change the current schema. What's more, event if use `INFER_AND_SAVE`, it seems like that it will still infer schema. although there is some cache, but i think it's not enough, the first execution of any query for each session would be very slow. {code:java} private def inferIfNeeded( relation: MetastoreRelation, options: Map[String, String], fileFormat: FileFormat, fileIndexOpt: Option[FileIndex] = None): (StructType, CatalogTable) = { val inferenceMode = sparkSession.sessionState.conf.caseSensitiveInferenceMode val shouldInfer = (inferenceMode != {color:red}NEVER_INFER{color}) && !relation.catalogTable.schemaPreservesCase val tableName = relation.catalogTable.identifier.unquotedString if (shouldInfer) { logInfo(s"Inferring case-sensitive schema for table $tableName (inference mode: " + s"$inferenceMode)") val fileIndex = fileIndexOpt.getOrElse { val rootPath = new Path(relation.catalogTable.location) new InMemoryFileIndex(sparkSession, Seq(rootPath), options, None) } val inferredSchema = fileFormat .inferSchema( sparkSession, options, fileIndex.listFiles(Nil).flatMap(_.files)) .map(mergeWithMetastoreSchema(relation.catalogTable.schema, _)) inferredSchema match { case Some(schema) => if (inferenceMode == INFER_AND_SAVE) { updateCatalogSchema(relation.catalogTable.identifier, schema) } (schema, relation.catalogTable.copy(schema = schema)) case None => logWarning(s"Unable to infer schema for table $tableName from file format " + s"$fileFormat (inference mode: $inferenceMode). Using metastore schema.") (relation.catalogTable.schema, relation.catalogTable) } } else { (relation.catalogTable.schema, relation.catalogTable) } } {code} > HiveMetastoreCatalog.InferIfNeeded too slow when caseSensitiveInference > enabled > ------------------------------------------------------------------------------- > > Key: SPARK-21885 > URL: https://issues.apache.org/jira/browse/SPARK-21885 > Project: Spark > Issue Type: Improvement > Components: SQL > Affects Versions: 2.1.0, 2.2.0, 2.3.0 > Environment: `spark.sql.hive.caseSensitiveInferenceMode` set to > INFER_ONLY > Reporter: liupengcheng > Labels: slow, sql > > Currently, SparkSQL infer schema is too slow, almost take 2 minutes. > I digged into the code, and finally findout the reason: > 1. In the analysis process of LogicalPlan spark will try to infer table > schema if `spark.sql.hive.caseSensitiveInferenceMode` set to INFER_ONLY, and > it will list all the leaf files of the rootPaths(just tableLocation), and > then call `getFileBlockLocations` to turn `FileStatus` into > `LocatedFileStatus`. This `getFileBlockLocations` for so manny leaf files > will take a long time, and it seems that the locations info is never used. > 2. When infer a parquet schema, if there is only one file, it will still > launch a spark job to merge schema. I think it's expensive. > Time costly stack is as follow: > {code:java} > at org.apache.hadoop.ipc.Client.call(Client.java:1403) > at > org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:232) > at com.sun.proxy.$Proxy16.getBlockLocations(Unknown Source) > at > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getBlockLocations(ClientNamenodeProtocolTranslatorPB.java:259) > at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102) > at com.sun.proxy.$Proxy17.getBlockLocations(Unknown Source) > at > org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:1234) > at > org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1224) > at > org.apache.hadoop.hdfs.DFSClient.getBlockLocations(DFSClient.java:1274) > at > org.apache.hadoop.hdfs.DistributedFileSystem$1.doCall(DistributedFileSystem.java:221) > at > org.apache.hadoop.hdfs.DistributedFileSystem$1.doCall(DistributedFileSystem.java:217) > at > org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) > at > org.apache.hadoop.hdfs.DistributedFileSystem.getFileBlockLocations(DistributedFileSystem.java:217) > at > org.apache.hadoop.hdfs.DistributedFileSystem.getFileBlockLocations(DistributedFileSystem.java:209) > at > org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex$$anonfun$org$apache$spark$sql$execution$datasources$PartitioningAwareFileIndex$$listLeafFiles$3.apply(PartitioningAwareFileIndex.scala:427) > at > org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex$$anonfun$org$apache$spark$sql$execution$datasources$PartitioningAwareFileIndex$$listLeafFiles$3.apply(PartitioningAwareFileIndex.scala:410) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) > at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) > at > scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186) > at > org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex$.org$apache$spark$sql$execution$datasources$PartitioningAwareFileIndex$$listLeafFiles(PartitioningAwareFileIndex.scala:410) > at > org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex$$anonfun$org$apache$spark$sql$execution$datasources$PartitioningAwareFileIndex$$bulkListLeafFiles$1.apply(PartitioningAwareFileIndex.scala:302) > at > org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex$$anonfun$org$apache$spark$sql$execution$datasources$PartitioningAwareFileIndex$$bulkListLeafFiles$1.apply(PartitioningAwareFileIndex.scala:301) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at > scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > at > org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex$.org$apache$spark$sql$execution$datasources$PartitioningAwareFileIndex$$bulkListLeafFiles(PartitioningAwareFileIndex.scala:301) > at > org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex.listLeafFiles(PartitioningAwareFileIndex.scala:253) > at > org.apache.spark.sql.execution.datasources.InMemoryFileIndex.refresh0(InMemoryFileIndex.scala:74) > at > org.apache.spark.sql.execution.datasources.InMemoryFileIndex.<init>(InMemoryFileIndex.scala:50) > at > org.apache.spark.sql.execution.datasources.PrunedInMemoryFileIndex.<init>(CatalogFileIndex.scala:108) > at > org.apache.spark.sql.execution.datasources.CatalogFileIndex.filterPartitions(CatalogFileIndex.scala:76) > at > org.apache.spark.sql.execution.datasources.CatalogFileIndex.listFiles(CatalogFileIndex.scala:54) > at > org.apache.spark.sql.hive.HiveMetastoreCatalog.org$apache$spark$sql$hive$HiveMetastoreCatalog$$inferIfNeeded(HiveMetastoreCatalog.scala:312) > at > org.apache.spark.sql.hive.HiveMetastoreCatalog$$anonfun$5.apply(HiveMetastoreCatalog.scala:243) > at > org.apache.spark.sql.hive.HiveMetastoreCatalog$$anonfun$5.apply(HiveMetastoreCatalog.scala:229) > at scala.Option.getOrElse(Option.scala:121) > at > org.apache.spark.sql.hive.HiveMetastoreCatalog.org$apache$spark$sql$hive$HiveMetastoreCatalog$$convertToLogicalRelation(HiveMetastoreCatalog.scala:229) > at > org.apache.spark.sql.hive.HiveMetastoreCatalog$ParquetConversions$.org$apache$spark$sql$hive$HiveMetastoreCatalog$ParquetConversions$$convertToParquetRelation(HiveMetastoreCatalog.scala:357) > at > org.apache.spark.sql.hive.HiveMetastoreCatalog$ParquetConversions$$anonfun$apply$1.applyOrElse(HiveMetastoreCatalog.scala:374) > at > org.apache.spark.sql.hive.HiveMetastoreCatalog$ParquetConversions$$anonfun$apply$1.applyOrElse(HiveMetastoreCatalog.scala:365) > at > org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:310) > at > org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:310) > at > org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70) > at > org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:309) > at > org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:307) > at > org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:307) > at > org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:331) > at > org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:188) > at > org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:329) > at > org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:307) > at > org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:307) > at > org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:307) > at > org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:331) > at > org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:188) > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org