Hi Fengdong,

Why it needs more memory at the driver side when there are many partitions? It 
seems the implementation can only support use cases for a dozen of partition 
when it is over 100, it fails apart. It is also quite slow to initialize the 
loading of partition tables when the number of partition is over 100. 

Best Regards,

Jerry

Sent from my iPhone

> On 26 Oct, 2015, at 2:50 am, Fengdong Yu <fengdo...@everstring.com> wrote:
> 
> How many partitions you generated?
> if Millions generated, then there is a huge memory consumed.
> 
> 
> 
> 
> 
>> On Oct 26, 2015, at 10:58 AM, Jerry Lam <chiling...@gmail.com> wrote:
>> 
>> Hi guys,
>> 
>> I mentioned that the partitions are generated so I tried to read the 
>> partition data from it. The driver is OOM after few minutes. The stack trace 
>> is below. It looks very similar to the the jstack above (note on the refresh 
>> method). Thanks!
>> 
>> Name: java.lang.OutOfMemoryError
>> Message: GC overhead limit exceeded
>> StackTrace: java.util.Arrays.copyOf(Arrays.java:2367)
>> java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:130)
>> java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:114)
>> java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:415)
>> java.lang.StringBuilder.append(StringBuilder.java:132)
>> org.apache.hadoop.fs.Path.toString(Path.java:384)
>> org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache$$anonfun$listLeafFiles$1.apply(interfaces.scala:447)
>> org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache$$anonfun$listLeafFiles$1.apply(interfaces.scala:447)
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>> scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>> scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>> scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
>> org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache.listLeafFiles(interfaces.scala:447)
>> org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache.refresh(interfaces.scala:453)
>> org.apache.spark.sql.sources.HadoopFsRelation.org$apache$spark$sql$sources$HadoopFsRelation$$fileStatusCache$lzycompute(interfaces.scala:465)
>> org.apache.spark.sql.sources.HadoopFsRelation.org$apache$spark$sql$sources$HadoopFsRelation$$fileStatusCache(interfaces.scala:463)
>> org.apache.spark.sql.sources.HadoopFsRelation.cachedLeafStatuses(interfaces.scala:470)
>> org.apache.spark.sql.execution.datasources.parquet.ParquetRelation$MetadataCache.refresh(ParquetRelation.scala:381)
>> org.apache.spark.sql.execution.datasources.parquet.ParquetRelation.org$apache$spark$sql$execution$datasources$parquet$ParquetRelation$$metadataCache$lzycompute(ParquetRelation.scala:145)
>> org.apache.spark.sql.execution.datasources.parquet.ParquetRelation.org$apache$spark$sql$execution$datasources$parquet$ParquetRelation$$metadataCache(ParquetRelation.scala:143)
>> org.apache.spark.sql.execution.datasources.parquet.ParquetRelation$$anonfun$6.apply(ParquetRelation.scala:196)
>> org.apache.spark.sql.execution.datasources.parquet.ParquetRelation$$anonfun$6.apply(ParquetRelation.scala:196)
>> scala.Option.getOrElse(Option.scala:120)
>> org.apache.spark.sql.execution.datasources.parquet.ParquetRelation.dataSchema(ParquetRelation.scala:196)
>> org.apache.spark.sql.sources.HadoopFsRelation.schema$lzycompute(interfaces.scala:561)
>> org.apache.spark.sql.sources.HadoopFsRelation.schema(interfaces.scala:560)
>> org.apache.spark.sql.execution.datasources.LogicalRelation.<init>(LogicalRelation.scala:31)
>> org.apache.spark.sql.SQLContext.baseRelationToDataFrame(SQLContext.scala:395)
>> org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:267)
>> 
>>> On Sun, Oct 25, 2015 at 10:25 PM, Jerry Lam <chiling...@gmail.com> wrote:
>>> Hi Josh,
>>> 
>>> No I don't have speculation enabled. The driver took about few hours until 
>>> it was OOM. Interestingly, all partitions are generated successfully 
>>> (_SUCCESS file is written in the output directory). Is there a reason why 
>>> the driver needs so much memory? The jstack revealed that it called refresh 
>>> some file statuses. Is there a way to avoid OutputCommitCoordinator to use 
>>> so much memory? 
>>> 
>>> Ultimately, I choose to use partitions because most of the queries I have 
>>> will execute based the partition field. For example, "SELECT events from 
>>> customer where customer_id = 1234". If the partition is based on 
>>> customer_id, all events for a customer can be easily retrieved without 
>>> filtering the entire dataset which is much more efficient (I hope). 
>>> However, I notice that the implementation of the partition logic does not 
>>> seem to allow this type of use cases without using a lot of memory which is 
>>> a bit odd in my opinion. Any help will be greatly appreciated.
>>> 
>>> Best Regards,
>>> 
>>> Jerry
>>> 
>>> 
>>> 
>>>> On Sun, Oct 25, 2015 at 9:25 PM, Josh Rosen <rosenvi...@gmail.com> wrote:
>>>> Hi Jerry,
>>>> 
>>>> Do you have speculation enabled? A write which produces one million files 
>>>> / output partitions might be using tons of driver memory via the 
>>>> OutputCommitCoordinator's bookkeeping data structures.
>>>> 
>>>>> On Sun, Oct 25, 2015 at 5:50 PM, Jerry Lam <chiling...@gmail.com> wrote:
>>>>> Hi spark guys,
>>>>> 
>>>>> I think I hit the same issue SPARK-8890 
>>>>> https://issues.apache.org/jira/browse/SPARK-8890. It is marked as 
>>>>> resolved. However it is not. I have over a million output directories for 
>>>>> 1 single column in partitionBy. Not sure if this is a regression issue? 
>>>>> Do I need to set some parameters to make it more memory efficient?
>>>>> 
>>>>> Best Regards,
>>>>> 
>>>>> Jerry
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>>> On Sun, Oct 25, 2015 at 8:39 PM, Jerry Lam <chiling...@gmail.com> wrote:
>>>>>> Hi guys,
>>>>>> 
>>>>>> After waiting for a day, it actually causes OOM on the spark driver. I 
>>>>>> configure the driver to have 6GB. Note that I didn't call refresh 
>>>>>> myself. The method was called when saving the dataframe in parquet 
>>>>>> format. Also I'm using partitionBy() on the DataFrameWriter to generate 
>>>>>> over 1 million files. Not sure why it OOM the driver after the job is 
>>>>>> marked _SUCCESS in the output folder. 
>>>>>> 
>>>>>> Best Regards,
>>>>>> 
>>>>>> Jerry
>>>>>> 
>>>>>> 
>>>>>>> On Sat, Oct 24, 2015 at 9:35 PM, Jerry Lam <chiling...@gmail.com> wrote:
>>>>>>> Hi Spark users and developers,
>>>>>>> 
>>>>>>> Does anyone encounter any issue when a spark SQL job produces a lot of 
>>>>>>> files (over 1 millions), the job hangs on the refresh method? I'm using 
>>>>>>> spark 1.5.1. Below is the stack trace. I saw the parquet files are 
>>>>>>> produced but the driver is doing something very intensively (it uses 
>>>>>>> all the cpus). Does it mean Spark SQL cannot be used to produce over 1 
>>>>>>> million files in a single job?
>>>>>>> 
>>>>>>> Thread 528: (state = BLOCKED)
>>>>>>>  - java.util.Arrays.copyOf(char[], int) @bci=1, line=2367 (Compiled 
>>>>>>> frame)
>>>>>>>  - java.lang.AbstractStringBuilder.expandCapacity(int) @bci=43, 
>>>>>>> line=130 (Compiled frame)
>>>>>>>  - java.lang.AbstractStringBuilder.ensureCapacityInternal(int) @bci=12, 
>>>>>>> line=114 (Compiled frame)
>>>>>>>  - java.lang.AbstractStringBuilder.append(java.lang.String) @bci=19, 
>>>>>>> line=415 (Compiled frame)
>>>>>>>  - java.lang.StringBuilder.append(java.lang.String) @bci=2, line=132 
>>>>>>> (Compiled frame)
>>>>>>>  - org.apache.hadoop.fs.Path.toString() @bci=128, line=384 (Compiled 
>>>>>>> frame)
>>>>>>>  - 
>>>>>>> org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache$$anonfun$listLeafFiles$1.apply(org.apache.hadoop.fs.FileStatus)
>>>>>>>  @bci=4, line=447 (Compiled frame)
>>>>>>>  - 
>>>>>>> org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache$$anonfun$listLeafFiles$1.apply(java.lang.Object)
>>>>>>>  @bci=5, line=447 (Compiled frame)
>>>>>>>  - 
>>>>>>> scala.collection.TraversableLike$$anonfun$map$1.apply(java.lang.Object) 
>>>>>>> @bci=9, line=244 (Compiled frame)
>>>>>>>  - 
>>>>>>> scala.collection.TraversableLike$$anonfun$map$1.apply(java.lang.Object) 
>>>>>>> @bci=2, line=244 (Compiled frame)
>>>>>>>  - 
>>>>>>> scala.collection.IndexedSeqOptimized$class.foreach(scala.collection.IndexedSeqOptimized,
>>>>>>>  scala.Function1) @bci=22, line=33 (Compiled frame)
>>>>>>>  - scala.collection.mutable.ArrayOps$ofRef.foreach(scala.Function1) 
>>>>>>> @bci=2, line=108 (Compiled frame)
>>>>>>>  - 
>>>>>>> scala.collection.TraversableLike$class.map(scala.collection.TraversableLike,
>>>>>>>  scala.Function1, scala.collection.generic.CanBuildFrom) @bci=17, 
>>>>>>> line=244 (Compiled frame)
>>>>>>>  - scala.collection.mutable.ArrayOps$ofRef.map(scala.Function1, 
>>>>>>> scala.collection.generic.CanBuildFrom) @bci=3, line=108 (Interpreted 
>>>>>>> frame)
>>>>>>>  - 
>>>>>>> org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache.listLeafFiles(java.lang.String[])
>>>>>>>  @bci=279, line=447 (Interpreted frame)
>>>>>>>  - 
>>>>>>> org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache.refresh() 
>>>>>>> @bci=8, line=453 (Interpreted frame)
>>>>>>>  - 
>>>>>>> org.apache.spark.sql.sources.HadoopFsRelation.org$apache$spark$sql$sources$HadoopFsRelation$$fileStatusCache$lzycompute()
>>>>>>>  @bci=26, line=465 (Interpreted frame)
>>>>>>>  - 
>>>>>>> org.apache.spark.sql.sources.HadoopFsRelation.org$apache$spark$sql$sources$HadoopFsRelation$$fileStatusCache()
>>>>>>>  @bci=12, line=463 (Interpreted frame)
>>>>>>>  - org.apache.spark.sql.sources.HadoopFsRelation.refresh() @bci=1, 
>>>>>>> line=540 (Interpreted frame)
>>>>>>>  - 
>>>>>>> org.apache.spark.sql.execution.datasources.parquet.ParquetRelation.refresh()
>>>>>>>  @bci=1, line=204 (Interpreted frame)
>>>>>>>  - 
>>>>>>> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply$mcV$sp()
>>>>>>>  @bci=392, line=152 (Interpreted frame)
>>>>>>>  - 
>>>>>>> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply()
>>>>>>>  @bci=1, line=108 (Interpreted frame)
>>>>>>>  - 
>>>>>>> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply()
>>>>>>>  @bci=1, line=108 (Interpreted frame)
>>>>>>>  - 
>>>>>>> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(org.apache.spark.sql.SQLContext,
>>>>>>>  org.apache.spark.sql.SQLContext$QueryExecution, scala.Function0) 
>>>>>>> @bci=96, line=56 (Interpreted frame)
>>>>>>>  - 
>>>>>>> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation.run(org.apache.spark.sql.SQLContext)
>>>>>>>  @bci=718, line=108 (Interpreted frame)
>>>>>>>  - 
>>>>>>> org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute()
>>>>>>>  @bci=20, line=57 (Interpreted frame)
>>>>>>>  - org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult() 
>>>>>>> @bci=15, line=57 (Interpreted frame)
>>>>>>>  - org.apache.spark.sql.execution.ExecutedCommand.doExecute() @bci=12, 
>>>>>>> line=69 (Interpreted frame)
>>>>>>>  - org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply() 
>>>>>>> @bci=11, line=140 (Interpreted frame)
>>>>>>>  - org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply() 
>>>>>>> @bci=1, line=138 (Interpreted frame)
>>>>>>>  - 
>>>>>>> org.apache.spark.rdd.RDDOperationScope$.withScope(org.apache.spark.SparkContext,
>>>>>>>  java.lang.String, boolean, boolean, scala.Function0) @bci=131, 
>>>>>>> line=147 (Interpreted frame)
>>>>>>>  - org.apache.spark.sql.execution.SparkPlan.execute() @bci=189, 
>>>>>>> line=138 (Interpreted frame)
>>>>>>>  - org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute() 
>>>>>>> @bci=21, line=933 (Interpreted frame)
>>>>>>>  - org.apache.spark.sql.SQLContext$QueryExecution.toRdd() @bci=13, 
>>>>>>> line=933 (Interpreted frame)
>>>>>>>  - 
>>>>>>> org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(org.apache.spark.sql.SQLContext,
>>>>>>>  java.lang.String, java.lang.String[], org.apache.spark.sql.SaveMode, 
>>>>>>> scala.collection.immutable.Map, org.apache.spark.sql.DataFrame) 
>>>>>>> @bci=293, line=197 (Interpreted frame)
>>>>>>>  - org.apache.spark.sql.DataFrameWriter.save() @bci=64, line=146 
>>>>>>> (Interpreted frame)
>>>>>>>  - org.apache.spark.sql.DataFrameWriter.save(java.lang.String) @bci=24, 
>>>>>>> line=137 (Interpreted frame)
>>>>>>>  - org.apache.spark.sql.DataFrameWriter.parquet(java.lang.String) 
>>>>>>> @bci=8, line=304 (Interpreted frame)
>>>>>>> 
>>>>>>> Best Regards,
>>>>>>> 
>>>>>>> Jerry
> 

Reply via email to