[jira] [Updated] (SPARK-27526) Driver OOM error occurs while writing parquet file with Append mode

2019-04-25 Thread Hyukjin Kwon (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27526?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon updated SPARK-27526:
-
Target Version/s:   (was: 2.1.1)

> Driver OOM error occurs while writing parquet file with Append mode
> ---
>
> Key: SPARK-27526
> URL: https://issues.apache.org/jira/browse/SPARK-27526
> Project: Spark
>  Issue Type: Bug
>  Components: Input/Output, SQL
>Affects Versions: 2.1.1
> Environment: centos6.7
>Reporter: senyoung
>Priority: Major
>  Labels: oom
>
> As this user code below
> {code:java}
> someDataFrame.write
> .mode(SaveMode.Append)
> .partitionBy(somePartitionKeySeqs)
> .parquet(targetPath);
> {code}
> When spark try to write parquet files into hdfs with the SaveMode.Append 
> mode,it must check the existing Partition Columns 
>  would match the "existed files" ,how ever,this behevior would cache all leaf 
> fileInfos under the "targetPath";
>  This can easily trigger oom when there are too many files in the targetPath;
>  This behevior is useful when someone needs the exactly correctness ,but i 
> think it should be optional to avoid the oom;
> The linked code be here
> {code:java}
> //package org.apache.spark.sql.execution.datasources
> //case class DataSource
> private def writeInFileFormat(format: FileFormat, mode: SaveMode, data: 
> DataFrame): Unit = {
> ...
> /**
> */can we make it optional?
> */
> if (mode == SaveMode.Append) {
>   val existingPartitionColumns = Try {
>   /**
> * getOrInferFileFormatSchema(format, justPartitioning = true),
> * this method may cause oom when there be too many files,could we just sample 
> limited files rather than all existed files ?
> */
> getOrInferFileFormatSchema(format, justPartitioning = true)
> ._2.fieldNames.toList
>   }.getOrElse(Seq.empty[String])
>   val sameColumns =
> existingPartitionColumns.map(_.toLowerCase()) == 
> partitionColumns.map(_.toLowerCase())
>   if (existingPartitionColumns.nonEmpty && !sameColumns) {
> throw new AnalysisException(
>   s"""Requested partitioning does not match existing partitioning.
>  |Existing partitioning columns:
>  |  ${existingPartitionColumns.mkString(", ")}
>  |Requested partitioning columns:
>  |  ${partitionColumns.mkString(", ")}
>  |""".stripMargin)
>   }
> }
> ...
> }
> private def getOrInferFileFormatSchema(
> format: FileFormat,
> justPartitioning: Boolean = false): (StructType, StructType) = {
>   lazy val tempFileIndex = {
> val allPaths = caseInsensitiveOptions.get("path") ++ paths
> val hadoopConf = sparkSession.sessionState.newHadoopConf()
> val globbedPaths = allPaths.toSeq.flatMap { path =>
>   val hdfsPath = new Path(path)
>   val fs = hdfsPath.getFileSystem(hadoopConf)
>   val qualified = hdfsPath.makeQualified(fs.getUri, 
> fs.getWorkingDirectory)
>   SparkHadoopUtil.get.globPathIfNecessary(qualified)
> }.toArray
>/**
> * InMemoryFileIndex.refresh0() cache all files info ,oom risks
>*/ 
> new InMemoryFileIndex(sparkSession, globbedPaths, options, None)
>   }
> ...
> }
> {code}
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-27526) Driver OOM error occurs while writing parquet file with Append mode

2019-04-21 Thread senyoung (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27526?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

senyoung updated SPARK-27526:
-
Labels: oom  (was: easyfix oom)

> Driver OOM error occurs while writing parquet file with Append mode
> ---
>
> Key: SPARK-27526
> URL: https://issues.apache.org/jira/browse/SPARK-27526
> Project: Spark
>  Issue Type: Bug
>  Components: Input/Output, SQL
>Affects Versions: 2.1.1
> Environment: centos6.7
>Reporter: senyoung
>Priority: Major
>  Labels: oom
>
> As this user code below
> {code:java}
> someDataFrame.write
> .mode(SaveMode.Append)
> .partitionBy(somePartitionKeySeqs)
> .parquet(targetPath);
> {code}
> When spark try to write parquet files into hdfs with the SaveMode.Append 
> mode,it must check the existing Partition Columns 
>  would match the "existed files" ,how ever,this behevior would cache all leaf 
> fileInfos under the "targetPath";
>  This can easily trigger oom when there are too many files in the targetPath;
>  This behevior is useful when someone needs the exactly correctness ,but i 
> think it should be optional to avoid the oom;
> The linked code be here
> {code:java}
> //package org.apache.spark.sql.execution.datasources
> //case class DataSource
> private def writeInFileFormat(format: FileFormat, mode: SaveMode, data: 
> DataFrame): Unit = {
> ...
> /**
> */can we make it optional?
> */
> if (mode == SaveMode.Append) {
>   val existingPartitionColumns = Try {
>   /**
> * getOrInferFileFormatSchema(format, justPartitioning = true),
> * this method may cause oom when there be too many files,could we just sample 
> limited files rather than all existed files ?
> */
> getOrInferFileFormatSchema(format, justPartitioning = true)
> ._2.fieldNames.toList
>   }.getOrElse(Seq.empty[String])
>   val sameColumns =
> existingPartitionColumns.map(_.toLowerCase()) == 
> partitionColumns.map(_.toLowerCase())
>   if (existingPartitionColumns.nonEmpty && !sameColumns) {
> throw new AnalysisException(
>   s"""Requested partitioning does not match existing partitioning.
>  |Existing partitioning columns:
>  |  ${existingPartitionColumns.mkString(", ")}
>  |Requested partitioning columns:
>  |  ${partitionColumns.mkString(", ")}
>  |""".stripMargin)
>   }
> }
> ...
> }
> private def getOrInferFileFormatSchema(
> format: FileFormat,
> justPartitioning: Boolean = false): (StructType, StructType) = {
>   lazy val tempFileIndex = {
> val allPaths = caseInsensitiveOptions.get("path") ++ paths
> val hadoopConf = sparkSession.sessionState.newHadoopConf()
> val globbedPaths = allPaths.toSeq.flatMap { path =>
>   val hdfsPath = new Path(path)
>   val fs = hdfsPath.getFileSystem(hadoopConf)
>   val qualified = hdfsPath.makeQualified(fs.getUri, 
> fs.getWorkingDirectory)
>   SparkHadoopUtil.get.globPathIfNecessary(qualified)
> }.toArray
>/**
> * InMemoryFileIndex.refresh0() cache all files info ,oom risks
>*/ 
> new InMemoryFileIndex(sparkSession, globbedPaths, options, None)
>   }
> ...
> }
> {code}
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-27526) Driver OOM error occurs while writing parquet file with Append mode

2019-04-21 Thread senyoung (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27526?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

senyoung updated SPARK-27526:
-
Labels: easyfix oom  (was: easyfix)

> Driver OOM error occurs while writing parquet file with Append mode
> ---
>
> Key: SPARK-27526
> URL: https://issues.apache.org/jira/browse/SPARK-27526
> Project: Spark
>  Issue Type: Bug
>  Components: Input/Output, SQL
>Affects Versions: 2.1.1
> Environment: centos6.7
>Reporter: senyoung
>Priority: Major
>  Labels: easyfix, oom
>
> As this user code below
> {code:java}
> someDataFrame.write
> .mode(SaveMode.Append)
> .partitionBy(somePartitionKeySeqs)
> .parquet(targetPath);
> {code}
> When spark try to write parquet files into hdfs with the SaveMode.Append 
> mode,it must check the existing Partition Columns 
>  would match the "existed files" ,how ever,this behevior would cache all leaf 
> fileInfos under the "targetPath";
>  This can easily trigger oom when there are too many files in the targetPath;
>  This behevior is useful when someone needs the exactly correctness ,but i 
> think it should be optional to avoid the oom;
> The linked code be here
> {code:java}
> //package org.apache.spark.sql.execution.datasources
> //case class DataSource
> private def writeInFileFormat(format: FileFormat, mode: SaveMode, data: 
> DataFrame): Unit = {
> ...
> /**
> */can we make it optional?
> */
> if (mode == SaveMode.Append) {
>   val existingPartitionColumns = Try {
>   /**
> * getOrInferFileFormatSchema(format, justPartitioning = true),
> * this method may cause oom when there be too many files,could we just sample 
> limited files rather than all existed files ?
> */
> getOrInferFileFormatSchema(format, justPartitioning = true)
> ._2.fieldNames.toList
>   }.getOrElse(Seq.empty[String])
>   val sameColumns =
> existingPartitionColumns.map(_.toLowerCase()) == 
> partitionColumns.map(_.toLowerCase())
>   if (existingPartitionColumns.nonEmpty && !sameColumns) {
> throw new AnalysisException(
>   s"""Requested partitioning does not match existing partitioning.
>  |Existing partitioning columns:
>  |  ${existingPartitionColumns.mkString(", ")}
>  |Requested partitioning columns:
>  |  ${partitionColumns.mkString(", ")}
>  |""".stripMargin)
>   }
> }
> ...
> }
> private def getOrInferFileFormatSchema(
> format: FileFormat,
> justPartitioning: Boolean = false): (StructType, StructType) = {
>   lazy val tempFileIndex = {
> val allPaths = caseInsensitiveOptions.get("path") ++ paths
> val hadoopConf = sparkSession.sessionState.newHadoopConf()
> val globbedPaths = allPaths.toSeq.flatMap { path =>
>   val hdfsPath = new Path(path)
>   val fs = hdfsPath.getFileSystem(hadoopConf)
>   val qualified = hdfsPath.makeQualified(fs.getUri, 
> fs.getWorkingDirectory)
>   SparkHadoopUtil.get.globPathIfNecessary(qualified)
> }.toArray
>/**
> * InMemoryFileIndex.refresh0() cache all files info ,oom risks
>*/ 
> new InMemoryFileIndex(sparkSession, globbedPaths, options, None)
>   }
> ...
> }
> {code}
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-27526) Driver OOM error occurs while writing parquet file with Append mode

2019-04-20 Thread senyoung (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27526?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

senyoung updated SPARK-27526:
-
Description: 
As this user code below
{code:java}
someDataFrame.write
.mode(SaveMode.Append)
.partitionBy(somePartitionKeySeqs)
.parquet(targetPath);
{code}
When spark try to write parquet files into hdfs with the SaveMode.Append 
mode,it must check the existing Partition Columns 
 would match the "existed files" ,how ever,this behevior would cache all leaf 
fileInfos under the "targetPath";
 This can easily trigger oom when there are too many files in the targetPath;
 This behevior is useful when someone needs the exactly correctness ,but i 
think it should be optional to avoid the oom;

The linked code be here
{code:java}
//package org.apache.spark.sql.execution.datasources
//case class DataSource

private def writeInFileFormat(format: FileFormat, mode: SaveMode, data: 
DataFrame): Unit = {
...
/**
*/can we make it optional?
*/
if (mode == SaveMode.Append) {
  val existingPartitionColumns = Try {
  /**
* getOrInferFileFormatSchema(format, justPartitioning = true),
* this method may cause oom when there be too many files,could we just sample 
limited files rather than all existed files ?
*/
getOrInferFileFormatSchema(format, justPartitioning = true)
._2.fieldNames.toList
  }.getOrElse(Seq.empty[String])

  val sameColumns =
existingPartitionColumns.map(_.toLowerCase()) == 
partitionColumns.map(_.toLowerCase())
  if (existingPartitionColumns.nonEmpty && !sameColumns) {
throw new AnalysisException(
  s"""Requested partitioning does not match existing partitioning.
 |Existing partitioning columns:
 |  ${existingPartitionColumns.mkString(", ")}
 |Requested partitioning columns:
 |  ${partitionColumns.mkString(", ")}
 |""".stripMargin)
  }
}
...
}


private def getOrInferFileFormatSchema(
format: FileFormat,
justPartitioning: Boolean = false): (StructType, StructType) = {
  lazy val tempFileIndex = {
val allPaths = caseInsensitiveOptions.get("path") ++ paths
val hadoopConf = sparkSession.sessionState.newHadoopConf()
val globbedPaths = allPaths.toSeq.flatMap { path =>
  val hdfsPath = new Path(path)
  val fs = hdfsPath.getFileSystem(hadoopConf)
  val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
  SparkHadoopUtil.get.globPathIfNecessary(qualified)
}.toArray
   /**
* InMemoryFileIndex.refresh0() cache all files info ,oom risks
   */ 
new InMemoryFileIndex(sparkSession, globbedPaths, options, None)
  }
...
}
{code}
 

 

 

  was:
As this user code below
{code:java}
someDataFrame.write
.mode(SaveMode.Append)
.partitionBy(somePartitionKeySeqs)
.parquet(targetPath);
{code}
When spark try to write parquet files into hdfs with the SaveMode.Append 
mode,it must check the existing Partition Columns 
 would match the "existed files" ,how ever,this behevior would cache all leaf 
fileInfos under the "targetPath";
 This can easily trigger oom when there are too many files in the targetPath;
 This behevior is useful when someone needs the exactly correctness ,but i 
think it should be optional to avoid the oom;

The linked code be here
{code:java}
//package org.apache.spark.sql.execution.datasources
//case class DataSource

private def writeInFileFormat(format: FileFormat, mode: SaveMode, data: 
DataFrame): Unit = {
...
if (mode == SaveMode.Append) {//can we make it optional? 
  val existingPartitionColumns = Try {
  /**
* getOrInferFileFormatSchema(format, justPartitioning = true),
* this method may cause oom when there be too many files,could we just sample 
limited files 
* rather than all existed files ?
*/
getOrInferFileFormatSchema(format, justPartitioning = true)
._2.fieldNames.toList
  }.getOrElse(Seq.empty[String])
  // TODO: Case sensitivity.
  val sameColumns =
existingPartitionColumns.map(_.toLowerCase()) == 
partitionColumns.map(_.toLowerCase())
  if (existingPartitionColumns.nonEmpty && !sameColumns) {
throw new AnalysisException(
  s"""Requested partitioning does not match existing partitioning.
 |Existing partitioning columns:
 |  ${existingPartitionColumns.mkString(", ")}
 |Requested partitioning columns:
 |  ${partitionColumns.mkString(", ")}
 |""".stripMargin)
  }
}
...
}


private def getOrInferFileFormatSchema(
format: FileFormat,
justPartitioning: Boolean = false): (StructType, StructType) = {
  // the operations below are expensive therefore try not to do them if we 
don't need to, e.g.,
  // in streaming mode, we have already inferred and registered partition 
columns, we will
  // never have to materialize the lazy val below
  lazy val tempFileIndex = {
val allPaths = caseInsensitiveOptions.get("path") ++ paths
val hadoopConf = sparkSession.sessionState.newHadoopConf()
val globbedPaths =