Hi Bryan,

I think they fixed some memory issues in 1.4 for the partition table 
implementation. 1.5 does much better in terms of executor memory usage for 
generating partition tables. However, if your table has over some thousand of 
partitions, reading the partition could be challenging. it takes awhile to 
initialize the partition table and it requires a lot of memory from the driver. 
I would not use it if the number of partition go over a few hundreds. 

Hope this help,

Jerry

Sent from my iPhone

> On 28 Oct, 2015, at 6:33 pm, Bryan <bryan.jeff...@gmail.com> wrote:
> 
> Jerry,
> 
> Thank you for the note. It sounds like you were able to get further than I 
> have been - any insight? Just a Spark 1.4.1 vs Spark 1.5?
> 
> Regards,
> 
> Bryan Jeffrey
> From: Jerry Lam
> Sent: ‎10/‎28/‎2015 6:29 PM
> To: Bryan Jeffrey
> Cc: Susan Zhang; user
> Subject: Re: Spark -- Writing to Partitioned Persistent Table
> 
> Hi Bryan,
> 
> Did you read the email I sent few days ago. There are more issues with 
> partitionBy down the road: 
> https://www.mail-archive.com/user@spark.apache.org/msg39512.html
> 
> Best Regards,
> 
> Jerry
> 
>> On Oct 28, 2015, at 4:52 PM, Bryan Jeffrey <bryan.jeff...@gmail.com> wrote:
>> 
>> The second issue I'm seeing is an OOM issue when writing partitioned data.  
>> I am running Spark 1.4.1, Scala 2.11, Hadoop 2.6.1 & using the Hive 
>> libraries packaged with Spark.  Spark was compiled using the following:  mvn 
>> -Dhadoop.version=2.6.1 -Dscala-2.11 -DskipTests -Pyarn -Phive 
>> -Phive-thriftserver package
>> 
>> Given a case class like the following:
>> 
>> case class HiveWindowsEvent(
>>                              targetEntity: String,
>>                              targetEntityType: String,
>>                              dateTimeUtc: Timestamp,
>>                              eventid: String,
>>                              eventData: Map[String, String],
>>                              description: String,
>>                              eventRecordId: String,
>>                              level: String,
>>                              machineName: String,
>>                              sequenceNumber: String,
>>                              source: String,
>>                              sourceMachineName: String,
>>                              taskCategory: String,
>>                              user: String,
>>                              machineIp: String,
>>                              additionalData: Map[String, String],
>>                              windowseventtimebin: Long
>>                              )
>> 
>> The command to write data works fine (and when queried via Beeline data is 
>> correct):
>> 
>>     val hc = new HiveContext(sc)
>>     import hc.implicits._
>> 
>>     val partitioner = new HashPartitioner(5)
>>     hiveWindowsEvents.foreachRDD(rdd => {
>>       val eventsDF = rdd.toDF()
>>       eventsDF
>>         .write
>>         .mode(SaveMode.Append).saveAsTable("windows_event9")
>>     })
>> 
>> Once I add the partitioning (few partitions - three or less):
>> 
>>     val hc = new HiveContext(sc)
>>     import hc.implicits._
>> 
>>     val partitioner = new HashPartitioner(5)
>>     hiveWindowsEvents.foreachRDD(rdd => {
>>       val eventsDF = rdd.toDF()
>>       eventsDF
>>         .write
>>         .partitionBy("windowseventtimebin")
>>         .mode(SaveMode.Append).saveAsTable("windows_event9")
>>     })
>> 
>> I see the following error when writing to (3) partitions:
>> 
>> 15/10/28 20:23:01 WARN TaskSetManager: Lost task 1.0 in stage 0.0 (TID 1, 
>> 10.0.0.6): org.apache.spark.SparkException: Task failed while writing rows.
>>         at 
>> org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org$apache$spark$sql$sources$InsertIntoHadoopFsRelation$$writeRows$2(commands.scala:270)
>>         at 
>> org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insertWithDynamicPartitions$3.apply(commands.scala:229)
>>         at 
>> org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insertWithDynamicPartitions$3.apply(commands.scala:229)
>>         at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
>>         at org.apache.spark.scheduler.Task.run(Task.scala:70)
>>         at 
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>>         at 
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>         at 
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>         at java.lang.Thread.run(Thread.java:745)
>> Caused by: java.lang.OutOfMemoryError: Java heap space
>>         at 
>> parquet.bytes.CapacityByteArrayOutputStream.initSlabs(CapacityByteArrayOutputStream.java:65)
>>         at 
>> parquet.bytes.CapacityByteArrayOutputStream.<init>(CapacityByteArrayOutputStream.java:57)
>>         at 
>> parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.<init>(ColumnChunkPageWriteStore.java:68)
>>         at 
>> parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.<init>(ColumnChunkPageWriteStore.java:48)
>>         at 
>> parquet.hadoop.ColumnChunkPageWriteStore.getPageWriter(ColumnChunkPageWriteStore.java:215)
>>         at 
>> parquet.column.impl.ColumnWriteStoreImpl.newMemColumn(ColumnWriteStoreImpl.java:67)
>>         at 
>> parquet.column.impl.ColumnWriteStoreImpl.getColumnWriter(ColumnWriteStoreImpl.java:56)
>>         at 
>> parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.<init>(MessageColumnIO.java:178)
>>         at 
>> parquet.io.MessageColumnIO.getRecordWriter(MessageColumnIO.java:369)
>>         at 
>> parquet.hadoop.InternalParquetRecordWriter.initStore(InternalParquetRecordWriter.java:108)
>>         at 
>> parquet.hadoop.InternalParquetRecordWriter.<init>(InternalParquetRecordWriter.java:94)
>>         at 
>> parquet.hadoop.ParquetRecordWriter.<init>(ParquetRecordWriter.java:64)
>>         at 
>> parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:282)
>>         at 
>> parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:252)
>>         at 
>> org.apache.spark.sql.parquet.ParquetOutputWriter.<init>(newParquet.scala:83)
>>         at 
>> org.apache.spark.sql.parquet.ParquetRelation2$$anon$4.newInstance(newParquet.scala:229)
>>         at 
>> org.apache.spark.sql.sources.DynamicPartitionWriterContainer$$anonfun$outputWriterForRow$1.apply(commands.scala:530)
>>         at 
>> org.apache.spark.sql.sources.DynamicPartitionWriterContainer$$anonfun$outputWriterForRow$1.apply(commands.scala:525)
>>         at 
>> scala.collection.mutable.MapLike$class.getOrElseUpdate(MapLike.scala:194)
>>         at scala.collection.mutable.AbstractMap.getOrElseUpdate(Map.scala:80)
>>         at 
>> org.apache.spark.sql.sources.DynamicPartitionWriterContainer.outputWriterForRow(commands.scala:525)
>>         at 
>> org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org$apache$spark$sql$sources$InsertIntoHadoopFsRelation$$writeRows$2(commands.scala:262)
>>         ... 8 more
>> 
>> I have dropped the data input volume so that it is negligible (100 events / 
>> second).  I am still seeing this OOM error.  I removed the 'map' elements in 
>> the case class above after seeing several issues with serializing maps to 
>> Parquet, and I'm still seeing the same errors.
>> 
>> The table (as created automatically by Spark HiveContext, with the maps 
>> removed) has the following:
>> 
>> 0: jdbc:hive2://localhost:10000> describe windows_event9;
>> +----------------------+------------+----------+
>> |       col_name       | data_type  | comment  |
>> +----------------------+------------+----------+
>> | targetEntity         | string     |          |
>> | targetEntityType     | string     |          |
>> | dateTimeUtc          | timestamp  |          |
>> | eventid              | string     |          |
>> | description          | string     |          |
>> | eventRecordId        | string     |          |
>> | level                | string     |          |
>> | machineName          | string     |          |
>> | sequenceNumber       | string     |          |
>> | source               | string     |          |
>> | sourceMachineName    | string     |          |
>> | taskCategory         | string     |          |
>> | user                 | string     |          |
>> | machineIp            | string     |          |
>> | windowseventtimebin  | bigint     |          |
>> +----------------------+------------+----------+
>> 
>> | SerDe Library:        
>> org.apache.hadoop.hive.serde2.MetadataTypedColumnsetSerDe                    
>>                                                                              
>>                                        |
>> | InputFormat:          org.apache.hadoop.mapred.SequenceFileInputFormat     
>>                                                                              
>>                                                                 |
>> | OutputFormat:         
>> org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat                    
>>                                                                              
>>                                        |
>> 
>> This seems like a pretty big bug associated with persistent tables.  Am I 
>> missing a step somewhere?
>> 
>> Thank you,
>> 
>> Bryan Jeffrey
>> 
>> 
>> 
>> On Wed, Oct 28, 2015 at 4:10 PM, Bryan Jeffrey <bryan.jeff...@gmail.com> 
>> wrote:
> 
> [The entire original message is not included.]

Reply via email to