[ https://issues.apache.org/jira/browse/SPARK-34204?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17340038#comment-17340038 ]
Nick Hryhoriev commented on SPARK-34204: ---------------------------------------- I write every simple code to avoid it, it little bit hacky but still works for me. {code:java} implicit class EnrichWithFilePathAndModificationTime(df: DataFrame) { def withFilePath(fileColumn: String)(implicit spark: SparkSession): DataFrame = { val existingFilesByPartition = df.rdd .partitions .map { case partition: FilePartition => assert(partition.files.length == 1) // Spark must be configured to read one file per partition. partition.index -> partition.files.head.filePath }.toMap val partIdRowEncoder = RowEncoder.apply( df.schema .add(fileColumn, StringType) ) df.mapPartitions { it => val sparkPartitionId = TaskContext.get().partitionId() val file = existingFilesByPartition(sparkPartitionId) it.map(r => Row.fromSeq(r.toSeq ++ Seq(file))) }(partIdRowEncoder) } } {code} This code snippet works only together with {code:java} // Do not change, our custom logic require only 1 file in spark partition .set("spark.sql.files.openCostInBytes", Int.MaxValue.toString){code} which may not suit use cases with a too big amount of small files. Please advise if anyone knows a better way. > When use input_file_name() func all column from file appeared in physical > plan of query, not only projection. > ------------------------------------------------------------------------------------------------------------- > > Key: SPARK-34204 > URL: https://issues.apache.org/jira/browse/SPARK-34204 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 2.4.7 > Reporter: Nick Hryhoriev > Priority: Major > > input_file_name() function damage applying projection to the physical plan of > the query. > if use this function and a new column, column-oriented formats like parquet > and orc put all columns to Physical plan. > While without it, only selected columns uploaded. > In my case, performance influence is x30. > {code:java} > import org.apache.spark.sql.SparkSession > import org.apache.spark.sql.functions._ > object TestSize { > def main(args: Array[String]): Unit = { > implicit val spark: SparkSession = SparkSession.builder() > .master("local") > .config("spark.sql.shuffle.partitions", "5") > .getOrCreate() > import spark.implicits._ > val query1 = spark.read.parquet( > "s3a://part-00040-a19f0d20-eab3-48ef-be5a-602c7f9a8e58.c000.gz.parquet" > ) > .select($"app_id", $"idfa", input_file_name().as("fileName")) > .distinct() > .count() > val query2 = spark.read.parquet( > "s3a://part-00040-a19f0d20-eab3-48ef-be5a- 602c7f9a8e58.c000.gz.parquet" ) > .select($"app_id", $"idfa") > .distinct() > .count() > Thread.sleep(10000000000L) > } > } > {code} > `query1` has all columns in the physical plan, while `query2` only two. -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org