[ https://issues.apache.org/jira/browse/PARQUET-1549?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16800515#comment-16800515 ]
Gabor Szadovszky commented on PARQUET-1549: ------------------------------------------- What is not clear for me in your design is how the different file names/paths are generated. The current way of finalizing/padding a row-group is based on the configuration and driven by the parquet-mr library. How can the act of ending the current file and starting the new one can be driven by the library if it does not know the requested name. Maybe a kind of name generator interface can help but not sure if it would not over-complicate the design. But, why would we need this implementation in the first place? Currently, parquet-mr handles the row-groups (in the different blocks) parallel (processed on different nodes) by using the haddop InputSplits. This way it does not matter if the row-group is a different file or only a separate hdfs block of the file. If Impala cannot handle the row-groups similarly then, I think, it is a lack of functionality at the Impala side and not at parquet-mr side. > Option for one block per file in MapReduce output > ------------------------------------------------- > > Key: PARQUET-1549 > URL: https://issues.apache.org/jira/browse/PARQUET-1549 > Project: Parquet > Issue Type: New Feature > Components: parquet-mr > Affects Versions: 1.10.0 > Reporter: Gustavo Figueiredo > Priority: Minor > > When we create PARQUET files using a MapReduce application with current > ParquetOutputFormat implementation, we don't have any option to reliably > limit the number of blocks (row groups) we want to generate per file. > The implemented configuration option 'parquet.block.size' > (ParquetOutputFormat.BLOCK_SIZE) refers to the amount of data that goes into > one block of data, but there are no guarantees that this will be the only > block in a file. If one sets this configuration option to a very high value, > it's likely there will be a single block per PARQUET file. However, this > approach might lead to undesirably big files, so this would not be a good > option in some scenarios. > This behaviour can't be achieved by the client's 'mapper' either. Although > there are some helpfull classes in Hadoop API, such as 'MultipleOutputs', we > don't have enough information available at 'mapper' code in order to have > this kind of control, unless one uses unsafe 'hacks' to gather information > from private fields. > By instance, suppose we have an ETL application that loads data from HBASE > regions (might be one or more MAPs per region) and produces PARQUET files to > be consumed in IMPALA tables (might be one or more PARQUET files per MAP > task). To simplify, let's say there is no 'REDUCE' task in this application. > For concreteness, lets say one could use for such job > 'org.apache.hadoop.hbase.mapreduce.TableInputFormat' as input and > 'org.apache.parquet.hadoop.ParquetOutputFormat' as output. > Following the guidelines for maximum query performance in Impala queries in > HADOOP ecosystem, each PARQUET file should be approximately equal in size to > a HDFS block and there should be only one single block of data (row group) in > each of them (see > https://impala.apache.org/docs/build/html/topics/impala_perf_cookbook.html#perf_cookbook__perf_cookbook_parquet_block_size). > Currently we are only able to do this by trial and error with different > configuration options. > It would be nice to have a new boolean configuration option (lets call it > 'parquet.split.file.per.block') related to the existing one > 'parquet.block.size'. If it's set to false (default value), we would have the > current behaviour. If it's to true, we would have one different PARQUET file > being generated for each 'block' created, all coming from the same > ParquetRecordWriter. > In doing so, we would only have to worry about tuning the > 'parquet.block.size' parameter in order to generate PARQUET files with one > single block per file whose size is closer to the configured HDFS block size. > > In order to implement this new feature, we only need to change a few classes > in 'org.apache.parquet.hadoop' package, namely: > InternalParquetRecordWriter > ParquetFileWriter > ParquetOutputFormat > ParquetRecordWriter > Briefly, these are the changes needed: > InternalParquetRecordWriter: > The field 'ParquetFileWriter parquetFileWriter' should not be 'final' > anymore, since we want to be able to change this throughout the task. > The method 'checkBlockSizeReached' should call a new function 'startNewFile' > just after a call to 'flushRowGroupToStore'. > The new method 'startNewFile' should have all the logic for closing the > current file and starting a new one at the same location with a proper > filename. > > ParquetFileWriter > The constructor argument 'OutputFile file' should be persisted as a new > member field and made available by a new public method. This information is > usefull for the 'startNewFile' implementation mentioned above. > The field 'MessageType schema' should be available by a new public method. > This information is also usefull for the 'startNewFile' implementation. > > ParquetOutputFormat > The existing private method 'getMaxPaddingSize' should be made 'public' or > at least 'package protected'. This information is usefull for the > 'startNewFile' implementation mentioned above. > The new configuration option 'parquet.split.file.per.block' should be > specified here like the other ones. The new behaviour in > 'InternalParquetRecordWriter' is conditioned on this configuration option. > > ParquetRecordWriter > Just pass away the configuration option to the internal > InternalParquetRecordWriter instance. -- This message was sent by Atlassian JIRA (v7.6.3#76005)