[ https://issues.apache.org/jira/browse/SPARK-27526?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
senyoung updated SPARK-27526: ----------------------------- Labels: easyfix oom (was: easyfix) > Driver OOM error occurs while writing parquet file with Append mode > ------------------------------------------------------------------- > > Key: SPARK-27526 > URL: https://issues.apache.org/jira/browse/SPARK-27526 > Project: Spark > Issue Type: Bug > Components: Input/Output, SQL > Affects Versions: 2.1.1 > Environment: centos6.7 > Reporter: senyoung > Priority: Major > Labels: easyfix, oom > > As this user code below > {code:java} > someDataFrame.write > .mode(SaveMode.Append) > .partitionBy(somePartitionKeySeqs) > .parquet(targetPath); > {code} > When spark try to write parquet files into hdfs with the SaveMode.Append > mode,it must check the existing Partition Columns > would match the "existed files" ,how ever,this behevior would cache all leaf > fileInfos under the "targetPath"; > This can easily trigger oom when there are too many files in the targetPath; > This behevior is useful when someone needs the exactly correctness ,but i > think it should be optional to avoid the oom; > The linked code be here > {code:java} > //package org.apache.spark.sql.execution.datasources > //case class DataSource > private def writeInFileFormat(format: FileFormat, mode: SaveMode, data: > DataFrame): Unit = { > ... > /** > */can we make it optional? > */ > if (mode == SaveMode.Append) { > val existingPartitionColumns = Try { > /** > * getOrInferFileFormatSchema(format, justPartitioning = true), > * this method may cause oom when there be too many files,could we just sample > limited files rather than all existed files ? > */ > getOrInferFileFormatSchema(format, justPartitioning = true) > ._2.fieldNames.toList > }.getOrElse(Seq.empty[String]) > val sameColumns = > existingPartitionColumns.map(_.toLowerCase()) == > partitionColumns.map(_.toLowerCase()) > if (existingPartitionColumns.nonEmpty && !sameColumns) { > throw new AnalysisException( > s"""Requested partitioning does not match existing partitioning. > |Existing partitioning columns: > | ${existingPartitionColumns.mkString(", ")} > |Requested partitioning columns: > | ${partitionColumns.mkString(", ")} > |""".stripMargin) > } > } > ... > } > private def getOrInferFileFormatSchema( > format: FileFormat, > justPartitioning: Boolean = false): (StructType, StructType) = { > lazy val tempFileIndex = { > val allPaths = caseInsensitiveOptions.get("path") ++ paths > val hadoopConf = sparkSession.sessionState.newHadoopConf() > val globbedPaths = allPaths.toSeq.flatMap { path => > val hdfsPath = new Path(path) > val fs = hdfsPath.getFileSystem(hadoopConf) > val qualified = hdfsPath.makeQualified(fs.getUri, > fs.getWorkingDirectory) > SparkHadoopUtil.get.globPathIfNecessary(qualified) > }.toArray > /** > * InMemoryFileIndex.refresh0() cache all files info ,oom risks > */ > new InMemoryFileIndex(sparkSession, globbedPaths, options, None) > } > ... > } > {code} > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org